See the License for the specific language governing permissions and limitations under the License. */ package job import ( "context" "errors" "fmt" "math" "sort" "strconv" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" restclient "k8s.io/client-go/rest" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" featuregatetesting "k8s.io/component-base/featuregate/testing" metricstestutil "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/utils/clock" clocktesting "k8s.io/utils/clock/testing" "k8s.io/utils/ptr" ) var realClock = &clock.RealClock{} var alwaysReady = func() bool { return true } const fastSyncJobBatchPeriod = 10 * time.Millisecond const fastJobApiBackoff = 10 * time.Millisecond const fastRequeue = 10 * time.Millisecond // testFinishedAt represents time one second later than unix epoch // this will be used in various test cases where we don't want back-off to kick in var testFinishedAt = metav1.NewTime((time.Time{}).Add(time.Second)) func newJobWithName(name string, parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job { j := &batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: metav1.ObjectMeta{ Name: name, UID: uuid.NewUUID(), Namespace: metav1.NamespaceDefault, }, Spec: batch.JobSpec{ Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, }, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "foo": "bar", }, }, Spec: v1.PodSpec{ Containers: []v1.Container{ {Image: "foo/bar"}, }, }, }, }, } if completionMode != "" { j.Spec.CompletionMode = &completionMode } // Special case: -1 for either completions or parallelism means leave nil (negative is not allowed // in practice by validation. if completions >= 0 { j.Spec.Completions = &completions } else { j.Spec.Completions = nil } if parallelism >= 0 { j.Spec.Parallelism = ¶llelism } else { j.Spec.Parallelism = nil } j.Spec.BackoffLimit = &backoffLimit return j } func newJob(parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job { return newJobWithName("foobar", parallelism, completions, backoffLimit, completionMode) } func newControllerFromClient(ctx context.Context, t *testing.T, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) { t.Helper() return newControllerFromClientWithClock(ctx, t, kubeClient, resyncPeriod, realClock) } func newControllerFromClientWithClock(ctx context.Context, t *testing.T, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, clock clock.WithTicker) (*Controller, informers.SharedInformerFactory) { t.Helper() sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod()) jm, err := newControllerWithClock(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient, clock) if err != nil { t.Fatalf("Error creating Job controller: %v", err) } jm.podControl = &controller.FakePodControl{} return jm, sharedInformers } func newPod(name string, job *batch.Job) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, UID: types.UID(name), Labels: job.Spec.Selector.MatchLabels, Namespace: job.Namespace, OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)}, }, } } // create count pods with the given phase for the given job func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod { var pods []*v1.Pod for i := 0; i < count; i++ { newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) newPod.Status = v1.PodStatus{Phase: status} newPod.Status.ContainerStatuses = []v1.ContainerStatus{ { State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ FinishedAt: testFinishedAt, }, }, }, } newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer) pods = append(pods, newPod) } return pods } func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods, terminatingPods, readyPods int) { for _, pod := range newPodList(pendingPods, v1.PodPending, job) { podIndexer.Add(pod) } running := newPodList(activePods, v1.PodRunning, job) for i, p := range running { if i >= readyPods { break } p.Status.Conditions = append(p.Status.Conditions, v1.PodCondition{ Type: v1.PodReady, Status: v1.ConditionTrue, }) } for _, pod := range running { podIndexer.Add(pod) } for _, pod := range newPodList(succeededPods, v1.PodSucceeded, job) { podIndexer.Add(pod) } for _, pod := range newPodList(failedPods, v1.PodFailed, job) { podIndexer.Add(pod) } terminating := newPodList(terminatingPods, v1.PodRunning, job) for _, p := range terminating { now := metav1.Now() p.DeletionTimestamp = &now } for _, pod := range terminating { podIndexer.Add(pod) } } func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status []indexPhase) { for _, s := range status { p := newPod(fmt.Sprintf("pod-%s", rand.String(10)), job) p.Status = v1.PodStatus{Phase: s.Phase} if s.Phase == v1.PodFailed || s.Phase == v1.PodSucceeded { p.Status.ContainerStatuses = []v1.ContainerStatus{ { State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ FinishedAt: testFinishedAt, }, }, }, } } if s.Index != noIndex { p.Annotations = map[string]string{ batch.JobCompletionIndexAnnotation: s.Index, } p.Spec.Hostname = fmt.Sprintf("%s-%s", job.Name, s.Index) } p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer) podIndexer.Add(p) } } type jobInitialStatus struct { active int succeed int failed int startTime *time.Time } func TestControllerSyncJob(t *testing.T) { _, ctx := ktesting.NewTestContext(t) jobConditionComplete := batch.JobComplete jobConditionFailed := batch.JobFailed jobConditionSuspended := batch.JobSuspended referenceTime := time.Now() testCases := map[string]struct { // job setup parallelism int32 completions int32 backoffLimit int32 deleting bool podLimit int completionMode batch.CompletionMode wasSuspended bool suspend bool podReplacementPolicy *batch.PodReplacementPolicy podFailurePolicy *batch.PodFailurePolicy initialStatus *jobInitialStatus backoffRecord *backoffRecord controllerTime *time.Time // pod setup // If a podControllerError is set, finalizers are not able to be removed. // This means that there is no status update so the counters for // failedPods and succeededPods cannot be incremented. podControllerError error pendingPods int activePods int readyPods int succeededPods int failedPods int terminatingPods int podsWithIndexes []indexPhase fakeExpectationAtCreation int32 // negative: ExpectDeletions, positive: ExpectCreations // expectations expectedCreations int32 expectedDeletions int32 expectedActive int32 expectedReady *int32 expectedSucceeded int32 expectedCompletedIdxs string expectedFailed int32 expectedTerminating *int32 expectedCondition *batch.JobConditionType expectedConditionStatus v1.ConditionStatus expectedConditionReason string expectedCreatedIndexes sets.Set[int] expectedPodPatches int // features podIndexLabelDisabled bool jobPodReplacementPolicy bool jobPodFailurePolicy bool }{ "job start": { parallelism: 2, completions: 5, backoffLimit: 6, expectedCreations: 2, expectedActive: 2, expectedReady: ptr.To[int32](0), }, "WQ job start": { parallelism: 2, completions: -1, backoffLimit: 6, expectedCreations: 2, expectedActive: 2, expectedReady: ptr.To[int32](0), }, "pending pods": { parallelism: 2, completions: 5, backoffLimit: 6, pendingPods: 2, expectedActive: 2, expectedReady: ptr.To[int32](0), }, "correct # of pods": { parallelism: 3, completions: 5, backoffLimit: 6, activePods: 3, readyPods: 2, expectedActive: 3, expectedReady: ptr.To[int32](2), }, "WQ job: correct # of pods": { parallelism: 2, completions: -1, backoffLimit: 6, activePods: 2, expectedActive: 2, expectedReady: ptr.To[int32](0), }, "too few active pods": { parallelism: 2, completions: 5, backoffLimit: 6, activePods: 1, succeededPods: 1, expectedCreations: 1, expectedActive: 2, expectedSucceeded: 1, expectedPodPatches: 1, expectedReady: ptr.To[int32](0), }, "WQ job: recreate pods when failed": { parallelism: 1, completions: -1, backoffLimit: 6, activePods: 1, failedPods: 1, podReplacementPolicy: podReplacementPolicy(batch.Failed), jobPodReplacementPolicy: true, terminatingPods: 1, expectedTerminating: ptr.To[int32](1), expectedReady: ptr.To[int32](0), // Removes finalizer and deletes one failed pod expectedPodPatches: 1, expectedFailed: 1, expectedActive: 1, }, "WQ job: turn on PodReplacementPolicy but not set PodReplacementPolicy": { parallelism: 1, completions: 1, backoffLimit: 6, activePods: 1, failedPods: 1, jobPodReplacementPolicy: true, expectedTerminating: ptr.To[int32](1), expectedReady: ptr.To[int32](0), terminatingPods: 1, expectedActive: 1, expectedPodPatches: 2, expectedFailed: 2, }, "WQ job: recreate pods when terminating or failed": { parallelism: 1, completions: -1, backoffLimit: 6, activePods: 1, failedPods: 1, podReplacementPolicy: podReplacementPolicy(batch.TerminatingOrFailed), jobPodReplacementPolicy: true, terminatingPods: 1, expectedTerminating: ptr.To[int32](1), expectedReady: ptr.To[int32](0), expectedActive: 1, expectedPodPatches: 2, expectedFailed: 2, }, "more terminating pods than parallelism": { parallelism: 1, completions: 1, backoffLimit: 6, activePods: 2, failedPods: 0, terminatingPods: 4, podReplacementPolicy: podReplacementPolicy(batch.Failed), jobPodReplacementPolicy: true, expectedTerminating: ptr.To[int32](4), expectedReady: ptr.To[int32](0), expectedActive: 1, expectedDeletions: 1, expectedPodPatches: 1, }, "more terminating pods than parallelism; PodFailurePolicy used": { // Repro for https://github.com/kubernetes/kubernetes/issues/122235 parallelism: 1, completions: 1, backoffLimit: 6, activePods: 2, failedPods: 0, terminatingPods: 4, jobPodFailurePolicy: true, podFailurePolicy: &batch.PodFailurePolicy{}, expectedTerminating: nil, expectedReady: ptr.To[int32](0), expectedActive: 1, expectedDeletions: 1, expectedPodPatches: 1, }, "too few active pods and active back-off": { parallelism: 1, completions: 1, backoffLimit: 6, backoffRecord: &backoffRecord{ failuresAfterLastSuccess: 1, lastFailureTime: &referenceTime, }, initialStatus: &jobInitialStatus{ startTime: func() *time.Time { now := time.Now() return &now }(), }, activePods: 0, succeededPods: 0, expectedCreations: 0, expectedActive: 0, expectedSucceeded: 0, expectedPodPatches: 0, expectedReady: ptr.To[int32](0), controllerTime: &referenceTime, }, "too few active pods and no back-offs": { parallelism: 1, completions: 1, backoffLimit: 6, backoffRecord: &backoffRecord{ failuresAfterLastSuccess: 0, lastFailureTime: &referenceTime, }, activePods: 0, succeededPods: 0, expectedCreations: 1, expectedActive: 1, expectedSucceeded: 0, expectedPodPatches: 0, expectedReady: ptr.To[int32](0), controllerTime: &referenceTime, }, "too few active pods with a dynamic job": { parallelism: 2, completions: -1, backoffLimit: 6, activePods: 1, expectedCreations: 1, expectedActive: 2, expectedReady: ptr.To[int32](0), }, "too few active pods, with controller error": { parallelism: 2, completions: 5, backoffLimit: 6, podControllerError: fmt.Errorf("fake error"), activePods: 1, succeededPods: 1, expectedCreations: 1, expectedActive: 1, expectedSucceeded: 0, expectedPodPatches: 1, expectedReady: ptr.To[int32](0), }, "too many active pods": { parallelism: 2, completions: 5, backoffLimit: 6, activePods: 3, expectedDeletions: 1, expectedActive: 2, expectedPodPatches: 1, expectedReady: ptr.To[int32](0), }, "too many active pods, with controller error": { parallelism: 2, completions: 5, backoffLimit: 6, podControllerError: fmt.Errorf("fake error"), activePods: 3, expectedDeletions: 0, expectedPodPatches: 1, expectedActive: 3, expectedReady: ptr.To[int32](0), }, "failed + succeed pods: reset backoff delay": { parallelism: 2, completions: 5, backoffLimit: 6, activePods: 1, succeededPods: 1, failedPods: 1, expectedCreations: 1, expectedActive: 2, expectedSucceeded: 1, expectedFailed: 1, expectedPodPatches: 2, expectedReady: ptr.To[int32](0), }, "new failed pod": { parallelism: 2, completions: 5, backoffLimit: 6, activePods: 1, failedPods: 1, expectedCreations: 1, expectedActive: 2, expectedFailed: 1, expectedPodPatches: 1, expectedReady: ptr.To[int32](0), }, "no new pod; possible finalizer update of failed pod": { parallelism: 1, completions: 1, backoffLimit: 6, initialStatus: &jobInitialStatus{ active: 1, succeed: 0, failed: 1, }, activePods: 1, failedPods: 0, expectedCreations: 0, expectedActive: 1, expectedFailed: 1, expectedPodPatches: 0, expectedReady: ptr.To[int32](0), }, "only new failed pod with controller error": { parallelism: 2, completions: 5, backoffLimit: 6, podControllerError: fmt.Errorf("fake error"), activePods: 1, failedPods: 1, expectedCreations: 1, expectedActive: 1, expectedFailed: 0, expectedPodPatches: 1, expectedReady: ptr.To[int32](0), }, "job finish": { parallelism: 2, completions: 5, backoffLimit: 6, succeededPods: 5, expectedSucceeded: 5, expectedCondition: &jobConditionComplete, expectedConditionStatus: v1.ConditionTrue, expectedPodPatches: 5, expectedReady: ptr.To[int32](0), }, "WQ job finishing": { parallelism: 2, completions: -1, backoffLimit: 6, activePods: 1, succeededPods: 1, expectedActive: 1, expectedSucceeded: 1, expectedPodPatches: 1, expectedReady: ptr.To[int32](0), }, "WQ job all finished": { parallelism: 2, completions: -1, backoffLimit: 6, succeededPods: 2, expectedSucceeded: 2, expectedCondition: &jobConditionComplete, expectedConditionStatus: v1.ConditionTrue, expectedPodPatches: 2, expectedReady: ptr.To[int32](0), }, "WQ job all finished despite one failure": { parallelism: 2, completions: -1, backoffLimit: 6, succeededPods: 1, failedPods: 1, expectedSucceeded: 1, expectedFailed: 1, expectedCondition: &jobConditionComplete, expectedConditionStatus: v1.ConditionTrue, expectedPodPatches: 2, expectedReady: ptr.To[int32](0), }, "more active pods than parallelism": { parallelism: 2, completions: 5, backoffLimit: 6, activePods: 10, expectedDeletions: 8, expectedActive: 2, expectedPodPatches: 8, expectedReady: ptr.To[int32](0), }, "more active pods than remaining completions": { parallelism: 3, completions: 4, backoffLimit: 6, activePods: 3, succeededPods: 2, expectedDeletions: 1, expectedActive: 2, expectedSucceeded: 2, expectedPodPatches: 3, expectedReady: ptr.To[int32](0), }, "status change": { parallelism: 2, completions: 5, backoffLimit: 6, activePods: 2, succeededPods: 2, expectedActive: 2, expectedSucceeded: 2, expectedPodPatches: 2, expectedReady: ptr.To[int32](0), }, "deleting job": { parallelism: 2, completions: 5, backoffLimit: 6, deleting: true, pendingPods: 1, activePods: 1, succeededPods: 1, expectedActive: 2, expectedSucceeded: 1, expectedPodPatches: 3, expectedReady: ptr.To[int32](0), }, "limited pods": { parallelism: 100, completions: 200, backoffLimit: 6, podLimit: 10, expectedCreations: 10, expectedActive: 10, expectedReady: ptr.To[int32](0), }, "too many job failures": { parallelism: 2, completions: 5, deleting: true, failedPods: 1, expectedFailed: 1, expectedCondition: &jobConditionFailed, expectedConditionStatus: v1.ConditionTrue, expectedConditionReason: "BackoffLimitExceeded", expectedPodPatches: 1, expectedReady: ptr.To[int32](0), }, "job failures, unsatisfied expectations": { parallelism: 2, completions: 5, deleting: true, failedPods: 1, fakeExpectationAtCreation: 1, expectedFailed: 1, expectedPodPatches: 1, expectedReady: ptr.To[int32](0), }, "indexed job start": { parallelism: 2, completions: 5, backoffLimit: 6, completionMode: batch.IndexedCompletion, expectedCreations: 2, expectedActive: 2, expectedCreatedIndexes: sets.New(0, 1), expectedReady: ptr.To[int32](0), }, "indexed job with some pods deleted, podReplacementPolicy Failed": { parallelism: 2, completions: 5, backoffLimit: 6, completionMode: batch.IndexedCompletion, expectedCreations: 1, expectedActive: 1, expectedCreatedIndexes: sets.New(0), podReplacementPolicy: podReplacementPolicy(batch.Failed), jobPodReplacementPolicy: true, terminatingPods: 1, expectedTerminating: ptr.To[int32](1), expectedReady: ptr.To[int32](0), }, "indexed job with some pods deleted, podReplacementPolicy TerminatingOrFailed": { parallelism: 2, completions: 5, backoffLimit: 6, completionMode: batch.IndexedCompletion, expectedCreations: 2, expectedActive: 2, expectedCreatedIndexes: sets.New(0, 1), podReplacementPolicy: podReplacementPolicy(batch.TerminatingOrFailed), jobPodReplacementPolicy: true, terminatingPods: 1, expectedTerminating: ptr.To[int32](1), expectedReady: ptr.To[int32](0), expectedPodPatches: 1, }, "indexed job completed": { parallelism: 2, completions: 3, backoffLimit: 6, completionMode: batch.IndexedCompletion, podsWithIndexes: []indexPhase{ {"0", v1.PodSucceeded}, {"1", v1.PodFailed}, {"1", v1.PodSucceeded}, {"2", v1.PodSucceeded}, }, expectedSucceeded: 3, expectedFailed: 1, expectedCompletedIdxs: "0-2", expectedCondition: &jobConditionComplete, expectedConditionStatus: v1.ConditionTrue, expectedPodPatches: 4, expectedReady: ptr.To[int32](0), }, "indexed job repeated completed index": { parallelism: 2, completions: 3, backoffLimit: 6, completionMode: batch.IndexedCompletion, podsWithIndexes: []indexPhase{ {"0", v1.PodSucceeded}, {"1", v1.PodSucceeded}, {"1", v1.PodSucceeded}, }, expectedCreations: 1, expectedActive: 1, expectedSucceeded: 2, expectedCompletedIdxs: "0,1", expectedCreatedIndexes: sets.New(2), expectedPodPatches: 3, expectedReady: ptr.To[int32](0), }, "indexed job some running and completed pods": { parallelism: 8, completions: 20, backoffLimit: 6, completionMode: batch.IndexedCompletion, podsWithIndexes: []indexPhase{ {"0", v1.PodRunning}, {"2", v1.PodSucceeded}, {"3", v1.PodPending}, {"4", v1.PodSucceeded}, {"5", v1.PodSucceeded}, {"7", v1.PodSucceeded}, {"8", v1.PodSucceeded}, {"9", v1.PodSucceeded}, }, expectedCreations: 6, expectedActive: 8, expectedSucceeded: 6, expectedCompletedIdxs: "2,4,5,7-9", expectedCreatedIndexes: sets.New(1, 6, 10, 11, 12, 13), expectedPodPatches: 6, expectedReady: ptr.To[int32](0), }, "indexed job some failed pods": { parallelism: 3, completions: 4, backoffLimit: 6, completionMode: batch.IndexedCompletion, podsWithIndexes: []indexPhase{ {"0", v1.PodFailed}, {"1", v1.PodPending}, {"2", v1.PodFailed}, }, expectedCreations: 2, expectedActive: 3, expectedFailed: 2, expectedCreatedIndexes: sets.New(0, 2), expectedPodPatches: 2, expectedReady: ptr.To[int32](0), }, "indexed job some pods without index": { parallelism: 2, completions: 5, backoffLimit: 6, completionMode: batch.IndexedCompletion, activePods: 1, succeededPods: 1, failedPods: 1, podsWithIndexes: []indexPhase{ {"invalid", v1.PodRunning}, {"invalid", v1.PodSucceeded}, {"invalid", v1.PodFailed}, {"invalid", v1.PodPending}, {"0", v1.PodSucceeded}, {"1", v1.PodRunning}, {"2", v1.PodRunning}, }, expectedDeletions: 3, expectedActive: 2, expectedSucceeded: 1, expectedFailed: 0, expectedCompletedIdxs: "0", expectedPodPatches: 8, expectedReady: ptr.To[int32](0), }, "indexed job repeated indexes": { parallelism: 5, completions: 5, backoffLimit: 6, completionMode: batch.IndexedCompletion, succeededPods: 1, failedPods: 1, podsWithIndexes: []indexPhase{ {"invalid", v1.PodRunning}, {"0", v1.PodSucceeded}, {"1", v1.PodRunning}, {"2", v1.PodRunning}, {"2", v1.PodPending}, }, expectedCreations: 0, expectedDeletions: 2, expectedActive: 2, expectedSucceeded: 1, expectedCompletedIdxs: "0", expectedPodPatches: 5, expectedReady: ptr.To[int32](0), }, "indexed job with indexes outside of range": { parallelism: 2, completions: 5, backoffLimit: 6, completionMode: batch.IndexedCompletion, podsWithIndexes: []indexPhase{ {"0", v1.PodSucceeded}, {"5", v1.PodRunning}, {"6", v1.PodSucceeded}, {"7", v1.PodPending}, {"8", v1.PodFailed}, }, expectedCreations: 0, // only one of creations and deletions can happen in a sync expectedSucceeded: 1, expectedDeletions: 2, expectedCompletedIdxs: "0", expectedActive: 0, expectedFailed: 0, expectedPodPatches: 5, expectedReady: ptr.To[int32](0), }, "suspending a job with satisfied expectations": { // Suspended Job should delete active pods when expectations are // satisfied. suspend: true, parallelism: 2, activePods: 2, // parallelism == active, expectations satisfied completions: 4, backoffLimit: 6, expectedCreations: 0, expectedDeletions: 2, expectedActive: 0, expectedCondition: &jobConditionSuspended, expectedConditionStatus: v1.ConditionTrue, expectedConditionReason: "JobSuspended", expectedPodPatches: 2, expectedReady: ptr.To[int32](0), }, "suspending a job with unsatisfied expectations": { // Unlike the previous test, we expect the controller to NOT suspend the // Job in the syncJob call because the controller will wait for // expectations to be satisfied first. The next syncJob call (not tested // here) will be the same as the previous test. suspend: true, parallelism: 2, activePods: 3, // active > parallelism, expectations unsatisfied fakeExpectationAtCreation: -1, // the controller is expecting a deletion completions: 4, backoffLimit: 6, expectedCreations: 0, expectedDeletions: 0, expectedActive: 3, expectedReady: ptr.To[int32](0), }, "resuming a suspended job": { wasSuspended: true, suspend: false, parallelism: 2, completions: 4, backoffLimit: 6, expectedCreations: 2, expectedDeletions: 0, expectedActive: 2, expectedCondition: &jobConditionSuspended, expectedConditionStatus: v1.ConditionFalse, expectedConditionReason: "JobResumed", expectedReady: ptr.To[int32](0), }, "suspending a deleted job": { // We would normally expect the active pods to be deleted (see a few test // cases above), but since this job is being deleted, we don't expect // anything changed here from before the job was suspended. The // JobSuspended condition is also missing. suspend: true, deleting: true, parallelism: 2, activePods: 2, // parallelism == active, expectations satisfied completions: 4, backoffLimit: 6, expectedCreations: 0, expectedDeletions: 0, expectedActive: 2, expectedPodPatches: 2, expectedReady: ptr.To[int32](0), }, "indexed job with podIndexLabel feature disabled": { parallelism: 2, completions: 5, backoffLimit: 6, completionMode: batch.IndexedCompletion, expectedCreations: 2, expectedActive: 2, expectedCreatedIndexes: sets.New(0, 1), podIndexLabelDisabled: true, expectedReady: ptr.To[int32](0), }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { logger, _ := ktesting.NewTestContext(t) defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodIndexLabel, !tc.podIndexLabelDisabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.jobPodReplacementPolicy)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobPodFailurePolicy)() // job manager setup clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) var fakeClock clock.WithTicker if tc.controllerTime != nil { fakeClock = clocktesting.NewFakeClock(*tc.controllerTime) } else { fakeClock = clocktesting.NewFakeClock(time.Now()) } manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock) fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady // job & pods setup job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode) job.Spec.Suspend = ptr.To(tc.suspend) if tc.jobPodReplacementPolicy { job.Spec.PodReplacementPolicy = tc.podReplacementPolicy } if tc.jobPodFailurePolicy { job.Spec.PodFailurePolicy = tc.podFailurePolicy } if tc.initialStatus != nil { startTime := metav1.Now() job.Status.StartTime = &startTime job.Status.Active = int32(tc.initialStatus.active) job.Status.Succeeded = int32(tc.initialStatus.succeed) job.Status.Failed = int32(tc.initialStatus.failed) if tc.initialStatus.startTime != nil { startTime := metav1.NewTime(*tc.initialStatus.startTime) job.Status.StartTime = &startTime } } key, err := controller.KeyFunc(job) if err != nil { t.Errorf("Unexpected error getting job key: %v", err) } if tc.backoffRecord != nil { tc.backoffRecord.key = key manager.podBackoffStore.updateBackoffRecord(*tc.backoffRecord) } if tc.fakeExpectationAtCreation < 0 { manager.expectations.ExpectDeletions(logger, key, int(-tc.fakeExpectationAtCreation)) } else if tc.fakeExpectationAtCreation > 0 { manager.expectations.ExpectCreations(logger, key, int(tc.fakeExpectationAtCreation)) } if tc.wasSuspended { job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now())) } if tc.deleting { now := metav1.Now() job.DeletionTimestamp = &now } sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.terminatingPods, tc.readyPods) setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes) actual := job manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } // run err = manager.syncJob(context.TODO(), testutil.GetKey(job, t)) // We need requeue syncJob task if podController error if tc.podControllerError != nil { if err == nil { t.Error("Syncing jobs expected to return error on podControl exception") } } else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit { if err == nil { t.Error("Syncing jobs expected to return error when reached the podControl limit") } } else if err != nil { t.Errorf("Unexpected error when syncing jobs: %v", err) } // validate created/deleted pods if int32(len(fakePodControl.Templates)) != tc.expectedCreations { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates)) } if tc.completionMode == batch.IndexedCompletion { checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name, tc.podIndexLabelDisabled) } else { for _, p := range fakePodControl.Templates { // Fake pod control doesn't add generate name from the owner reference. if p.GenerateName != "" { t.Errorf("Got pod generate name %s, want %s", p.GenerateName, "") } if p.Spec.Hostname != "" { t.Errorf("Got pod hostname %q, want none", p.Spec.Hostname) } } } if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName)) } // Each create should have an accompanying ControllerRef. if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) { t.Errorf("Unexpected number of ControllerRefs. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.ControllerRefs)) } // Make sure the ControllerRefs are correct. for _, controllerRef := range fakePodControl.ControllerRefs { if got, want := controllerRef.APIVersion, "batch/v1"; got != want { t.Errorf("controllerRef.APIVersion = %q, want %q", got, want) } if got, want := controllerRef.Kind, "Job"; got != want { t.Errorf("controllerRef.Kind = %q, want %q", got, want) } if got, want := controllerRef.Name, job.Name; got != want { t.Errorf("controllerRef.Name = %q, want %q", got, want) } if got, want := controllerRef.UID, job.UID; got != want { t.Errorf("controllerRef.UID = %q, want %q", got, want) } if controllerRef.Controller == nil || *controllerRef.Controller != true { t.Errorf("controllerRef.Controller is not set to true") } } // validate status if actual.Status.Active != tc.expectedActive { t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) } if diff := cmp.Diff(tc.expectedReady, actual.Status.Ready); diff != "" { t.Errorf("Unexpected number of ready pods (-want,+got): %s", diff) } if actual.Status.Succeeded != tc.expectedSucceeded { t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) } if diff := cmp.Diff(tc.expectedCompletedIdxs, actual.Status.CompletedIndexes); diff != "" { t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) } if actual.Status.Failed != tc.expectedFailed { t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) } if diff := cmp.Diff(tc.expectedTerminating, actual.Status.Terminating); diff != "" { t.Errorf("Unexpected number of terminating pods (-want,+got): %s", diff) } if actual.Status.StartTime != nil && tc.suspend { t.Error("Unexpected .status.startTime not nil when suspend is true") } if actual.Status.StartTime == nil && !tc.suspend { t.Error("Missing .status.startTime") } // validate conditions if tc.expectedCondition != nil { if !getCondition(actual, *tc.expectedCondition, tc.expectedConditionStatus, tc.expectedConditionReason) { t.Errorf("Expected completion condition. Got %#v", actual.Status.Conditions) } } else { if cond := hasTrueCondition(actual); cond != nil { t.Errorf("Got condition %s, want none", *cond) } } if tc.expectedCondition == nil && tc.suspend && len(actual.Status.Conditions) != 0 { t.Errorf("Unexpected conditions %v", actual.Status.Conditions) } // validate slow start expectedLimit := 0 for pass := uint8(0); expectedLimit <= tc.podLimit; pass++ { expectedLimit += controller.SlowStartInitialBatchSize << pass } if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit { t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount) } if p := len(fakePodControl.Patches); p != tc.expectedPodPatches { t.Errorf("Got %d pod patches, want %d", p, tc.expectedPodPatches) } }) } } func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Set[int], jobName string, podIndexLabelDisabled bool) { t.Helper() gotIndexes := sets.New[int]() for _, p := range control.Templates { checkJobCompletionEnvVariable(t, &p.Spec, podIndexLabelDisabled) if !podIndexLabelDisabled { checkJobCompletionLabel(t, &p) } ix := getCompletionIndex(p.Annotations) if ix == -1 { t.Errorf("Created pod %s didn't have completion index", p.Name) } else { gotIndexes.Insert(ix) } expectedName := fmt.Sprintf("%s-%d", jobName, ix) if expectedName != p.Spec.Hostname { t.Errorf("Got pod hostname %s, want %s", p.Spec.Hostname, expectedName) } expectedName += "-" if expectedName != p.GenerateName { t.Errorf("Got pod generate name %s, want %s", p.GenerateName, expectedName) } } if diff := cmp.Diff(sets.List(wantIndexes), sets.List(gotIndexes)); diff != "" { t.Errorf("Unexpected created completion indexes (-want,+got):\n%s", diff) } } func TestGetNewFinshedPods(t *testing.T) { cases := map[string]struct { job batch.Job pods []*v1.Pod expectedRmFinalizers sets.Set[string] wantSucceeded int32 wantFailed int32 }{ "some counted": { job: batch.Job{ Status: batch.JobStatus{ Succeeded: 2, Failed: 1, UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).Pod, buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("d").phase(v1.PodFailed).Pod, buildPod().uid("e").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("f").phase(v1.PodRunning).Pod, }, wantSucceeded: 4, wantFailed: 2, }, "some uncounted": { job: batch.Job{ Status: batch.JobStatus{ Succeeded: 1, Failed: 1, UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a", "c"}, Failed: []types.UID{"e", "f"}, }, }, }, pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).Pod, buildPod().uid("b").phase(v1.PodSucceeded).Pod, buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("d").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("e").phase(v1.PodFailed).Pod, buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("g").phase(v1.PodFailed).trackingFinalizer().Pod, }, wantSucceeded: 4, wantFailed: 4, }, "with expected removed finalizers": { job: batch.Job{ Status: batch.JobStatus{ Succeeded: 2, Failed: 2, UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a"}, Failed: []types.UID{"d"}, }, }, }, expectedRmFinalizers: sets.New("b", "f"), pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).Pod, buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("d").phase(v1.PodFailed).Pod, buildPod().uid("e").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("g").phase(v1.PodFailed).trackingFinalizer().Pod, }, wantSucceeded: 4, wantFailed: 5, }, "deleted pods": { job: batch.Job{ Status: batch.JobStatus{ Succeeded: 1, Failed: 1, UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().deletionTimestamp().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().deletionTimestamp().Pod, buildPod().uid("c").phase(v1.PodRunning).trackingFinalizer().deletionTimestamp().Pod, buildPod().uid("d").phase(v1.PodPending).trackingFinalizer().deletionTimestamp().Pod, buildPod().uid("e").phase(v1.PodRunning).deletionTimestamp().Pod, buildPod().uid("f").phase(v1.PodPending).deletionTimestamp().Pod, }, wantSucceeded: 2, wantFailed: 4, }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) jobCtx := &syncJobCtx{job: &tc.job, pods: tc.pods, uncounted: uncounted, expectedRmFinalizers: tc.expectedRmFinalizers} succeededPods, failedPods := getNewFinishedPods(jobCtx) succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded)) failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed)) if succeeded != tc.wantSucceeded { t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded) } if failed != tc.wantFailed { t.Errorf("getStatus reports %d succeeded pods, want %d", failed, tc.wantFailed) } }) } } func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) fakeClock := clocktesting.NewFakeClock(time.Now()) now := fakeClock.Now() minuteAgo := now.Add(-time.Minute) completedCond := newCondition(batch.JobComplete, v1.ConditionTrue, "", "", now) succeededCond := newCondition(batch.JobSuccessCriteriaMet, v1.ConditionTrue, "", "", minuteAgo) failedCond := newCondition(batch.JobFailed, v1.ConditionTrue, "", "", now) indexedCompletion := batch.IndexedCompletion mockErr := errors.New("mock error") cases := map[string]struct { job batch.Job pods []*v1.Pod finishedCond *batch.JobCondition expectedRmFinalizers sets.Set[string] needsFlush bool statusUpdateErr error podControlErr error wantErr error wantRmFinalizers int wantStatusUpdates []batch.JobStatus wantSucceededPodsMetric int wantFailedPodsMetric int // features enableJobBackoffLimitPerIndex bool enableJobSuccessPolicy bool }{ "no updates": {}, "new active": { job: batch.Job{ Status: batch.JobStatus{ Active: 1, }, }, needsFlush: true, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Active: 1, }, }, }, "track finished pods": { pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().deletionTimestamp().Pod, buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().deletionTimestamp().Pod, buildPod().uid("e").phase(v1.PodPending).trackingFinalizer().deletionTimestamp().Pod, buildPod().phase(v1.PodPending).trackingFinalizer().Pod, buildPod().phase(v1.PodRunning).trackingFinalizer().Pod, }, wantRmFinalizers: 5, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a", "c"}, Failed: []types.UID{"b", "d", "e"}, }, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Succeeded: 2, Failed: 3, }, }, wantSucceededPodsMetric: 2, wantFailedPodsMetric: 3, }, "past and new finished pods": { job: batch.Job{ Status: batch.JobStatus{ Active: 1, Succeeded: 2, Failed: 3, UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a", "e"}, Failed: []types.UID{"b", "f"}, }, }, }, pods: []*v1.Pod{ buildPod().uid("e").phase(v1.PodSucceeded).Pod, buildPod().phase(v1.PodFailed).Pod, buildPod().phase(v1.PodPending).Pod, buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod, }, wantRmFinalizers: 4, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a", "c"}, Failed: []types.UID{"b", "d"}, }, Active: 1, Succeeded: 3, Failed: 4, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Active: 1, Succeeded: 5, Failed: 6, }, }, wantSucceededPodsMetric: 3, wantFailedPodsMetric: 3, }, "expecting removed finalizers": { job: batch.Job{ Status: batch.JobStatus{ Succeeded: 2, Failed: 3, UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a", "g"}, Failed: []types.UID{"b", "h"}, }, }, }, expectedRmFinalizers: sets.New("c", "d", "g", "h"), pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("e").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("g").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("h").phase(v1.PodFailed).trackingFinalizer().Pod, }, wantRmFinalizers: 4, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a", "e"}, Failed: []types.UID{"b", "f"}, }, Succeeded: 3, Failed: 4, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Succeeded: 5, Failed: 6, }, }, wantSucceededPodsMetric: 3, wantFailedPodsMetric: 3, }, "succeeding job by JobSuccessPolicy": { pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("c").phase(v1.PodPending).trackingFinalizer().Pod, }, finishedCond: succeededCond, wantRmFinalizers: 3, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a"}, Failed: []types.UID{"b"}, }, Conditions: []batch.JobCondition{*succeededCond}, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Succeeded: 1, Failed: 1, Conditions: []batch.JobCondition{*succeededCond, *completedCond}, CompletionTime: ptr.To(metav1.NewTime(now)), }, }, wantSucceededPodsMetric: 1, wantFailedPodsMetric: 1, enableJobSuccessPolicy: true, }, "completing job": { pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, }, finishedCond: completedCond, wantRmFinalizers: 2, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a"}, Failed: []types.UID{"b"}, }, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Succeeded: 1, Failed: 1, Conditions: []batch.JobCondition{*completedCond}, CompletionTime: &completedCond.LastTransitionTime, }, }, wantSucceededPodsMetric: 1, wantFailedPodsMetric: 1, }, "failing job": { pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("c").phase(v1.PodRunning).trackingFinalizer().Pod, }, finishedCond: failedCond, // Running pod counts as failed. wantRmFinalizers: 3, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a"}, Failed: []types.UID{"b", "c"}, }, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Succeeded: 1, Failed: 2, Conditions: []batch.JobCondition{*failedCond}, }, }, wantSucceededPodsMetric: 1, wantFailedPodsMetric: 2, }, "deleted job": { job: batch.Job{ ObjectMeta: metav1.ObjectMeta{ DeletionTimestamp: &metav1.Time{}, }, Status: batch.JobStatus{ Active: 1, }, }, pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().phase(v1.PodRunning).trackingFinalizer().Pod, }, // Removing finalizer from Running pod, but doesn't count as failed. wantRmFinalizers: 3, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a"}, Failed: []types.UID{"b"}, }, Active: 1, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Active: 1, Succeeded: 1, Failed: 1, }, }, wantSucceededPodsMetric: 1, wantFailedPodsMetric: 1, }, "status update error": { pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, }, statusUpdateErr: mockErr, wantErr: mockErr, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a"}, Failed: []types.UID{"b"}, }, }, }, }, "pod patch errors": { pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, }, podControlErr: mockErr, wantErr: mockErr, wantRmFinalizers: 2, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a"}, Failed: []types.UID{"b"}, }, }, }, }, "pod patch errors with partial success": { job: batch.Job{ Status: batch.JobStatus{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"a"}, Failed: []types.UID{"b"}, }, }, }, pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).Pod, buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod, }, podControlErr: mockErr, wantErr: mockErr, wantRmFinalizers: 2, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"c"}, Failed: []types.UID{"d"}, }, Succeeded: 1, Failed: 1, }, }, }, "indexed job new successful pods": { job: batch.Job{ Spec: batch.JobSpec{ CompletionMode: &indexedCompletion, Completions: ptr.To[int32](6), }, Status: batch.JobStatus{ Active: 1, }, }, pods: []*v1.Pod{ buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("1").Pod, buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("3").Pod, buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("3").Pod, buildPod().phase(v1.PodRunning).trackingFinalizer().index("5").Pod, buildPod().phase(v1.PodSucceeded).trackingFinalizer().Pod, }, wantRmFinalizers: 4, wantStatusUpdates: []batch.JobStatus{ { Active: 1, Succeeded: 2, CompletedIndexes: "1,3", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, wantSucceededPodsMetric: 2, }, "indexed job prev successful pods outside current completions index range with no new succeeded pods": { job: batch.Job{ Spec: batch.JobSpec{ CompletionMode: &indexedCompletion, Completions: ptr.To[int32](2), Parallelism: ptr.To[int32](2), }, Status: batch.JobStatus{ Active: 2, Succeeded: 1, CompletedIndexes: "3", }, }, pods: []*v1.Pod{ buildPod().phase(v1.PodRunning).trackingFinalizer().index("0").Pod, buildPod().phase(v1.PodRunning).trackingFinalizer().index("1").Pod, }, wantRmFinalizers: 0, wantStatusUpdates: []batch.JobStatus{ { Active: 2, Succeeded: 0, CompletedIndexes: "", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, }, "indexed job prev successful pods outside current completions index range with new succeeded pods in range": { job: batch.Job{ Spec: batch.JobSpec{ CompletionMode: &indexedCompletion, Completions: ptr.To[int32](2), Parallelism: ptr.To[int32](2), }, Status: batch.JobStatus{ Active: 2, Succeeded: 1, CompletedIndexes: "3", }, }, pods: []*v1.Pod{ buildPod().phase(v1.PodRunning).trackingFinalizer().index("0").Pod, buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("1").Pod, }, wantRmFinalizers: 1, wantStatusUpdates: []batch.JobStatus{ { Active: 2, Succeeded: 1, CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, wantSucceededPodsMetric: 1, }, "indexed job new failed pods": { job: batch.Job{ Spec: batch.JobSpec{ CompletionMode: &indexedCompletion, Completions: ptr.To[int32](6), }, Status: batch.JobStatus{ Active: 1, }, }, pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().index("1").Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().index("3").Pod, buildPod().uid("c").phase(v1.PodFailed).trackingFinalizer().index("3").Pod, buildPod().uid("d").phase(v1.PodRunning).trackingFinalizer().index("5").Pod, buildPod().phase(v1.PodFailed).trackingFinalizer().Pod, }, wantRmFinalizers: 4, wantStatusUpdates: []batch.JobStatus{ { Active: 1, UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Failed: []types.UID{"a", "b", "c"}, }, }, { Active: 1, Failed: 3, UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, wantFailedPodsMetric: 3, }, "indexed job past and new pods": { job: batch.Job{ Spec: batch.JobSpec{ CompletionMode: &indexedCompletion, Completions: ptr.To[int32](7), }, Status: batch.JobStatus{ Failed: 2, Succeeded: 5, CompletedIndexes: "0-2,4,6,7", }, }, pods: []*v1.Pod{ buildPod().phase(v1.PodSucceeded).index("0").Pod, buildPod().phase(v1.PodFailed).index("1").Pod, buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("1").Pod, buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("3").Pod, buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().index("2").Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().index("5").Pod, }, wantRmFinalizers: 4, wantStatusUpdates: []batch.JobStatus{ { Succeeded: 6, Failed: 2, CompletedIndexes: "0-4,6", UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Failed: []types.UID{"a", "b"}, }, }, { Succeeded: 6, Failed: 4, CompletedIndexes: "0-4,6", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, wantSucceededPodsMetric: 1, wantFailedPodsMetric: 2, }, "too many finished": { job: batch.Job{ Status: batch.JobStatus{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Failed: []types.UID{"a", "b"}, }, }, }, pods: func() []*v1.Pod { pods := make([]*v1.Pod, 500) for i := range pods { pods[i] = buildPod().uid(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod } pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) return pods }(), wantRmFinalizers: 499, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: func() []types.UID { uids := make([]types.UID, 499) for i := range uids { uids[i] = types.UID(strconv.Itoa(i)) } return uids }(), Failed: []types.UID{"b"}, }, Failed: 1, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Failed: []types.UID{"b"}, }, Succeeded: 499, Failed: 1, }, }, wantSucceededPodsMetric: 499, wantFailedPodsMetric: 1, }, "too many indexed finished": { job: batch.Job{ Spec: batch.JobSpec{ CompletionMode: &indexedCompletion, Completions: ptr.To[int32](501), }, }, pods: func() []*v1.Pod { pods := make([]*v1.Pod, 501) for i := range pods { pods[i] = buildPod().uid(strconv.Itoa(i)).index(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod } return pods }(), wantRmFinalizers: 500, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, CompletedIndexes: "0-499", Succeeded: 500, }, }, wantSucceededPodsMetric: 500, }, "pod flips from failed to succeeded": { job: batch.Job{ Spec: batch.JobSpec{ Completions: ptr.To[int32](2), Parallelism: ptr.To[int32](2), }, Status: batch.JobStatus{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Failed: []types.UID{"a", "b"}, }, }, }, pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod, }, finishedCond: failedCond, wantRmFinalizers: 2, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Failed: 2, Conditions: []batch.JobCondition{*failedCond}, }, }, wantFailedPodsMetric: 2, }, "indexed job with a failed pod with delayed finalizer removal; the pod is not counted": { enableJobBackoffLimitPerIndex: true, job: batch.Job{ Spec: batch.JobSpec{ CompletionMode: &indexedCompletion, Completions: ptr.To[int32](6), BackoffLimitPerIndex: ptr.To[int32](1), }, }, pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().index("1").Pod, }, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, FailedIndexes: ptr.To(""), }, }, }, "indexed job with a failed pod which is recreated by a running pod; the pod is counted": { enableJobBackoffLimitPerIndex: true, job: batch.Job{ Spec: batch.JobSpec{ CompletionMode: &indexedCompletion, Completions: ptr.To[int32](6), BackoffLimitPerIndex: ptr.To[int32](1), }, Status: batch.JobStatus{ Active: 1, }, }, pods: []*v1.Pod{ buildPod().uid("a1").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().index("1").Pod, buildPod().uid("a2").phase(v1.PodRunning).indexFailureCount("1").trackingFinalizer().index("1").Pod, }, wantRmFinalizers: 1, wantStatusUpdates: []batch.JobStatus{ { Active: 1, UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Failed: []types.UID{"a1"}, }, FailedIndexes: ptr.To(""), }, { Active: 1, Failed: 1, UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, FailedIndexes: ptr.To(""), }, }, wantFailedPodsMetric: 1, }, "indexed job with a failed pod for a failed index; the pod is counted": { enableJobBackoffLimitPerIndex: true, job: batch.Job{ Spec: batch.JobSpec{ CompletionMode: &indexedCompletion, Completions: ptr.To[int32](6), BackoffLimitPerIndex: ptr.To[int32](1), }, }, pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().index("1").Pod, }, wantRmFinalizers: 1, wantStatusUpdates: []batch.JobStatus{ { FailedIndexes: ptr.To("1"), UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Failed: []types.UID{"a"}, }, }, { Failed: 1, FailedIndexes: ptr.To("1"), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, wantFailedPodsMetric: 1, }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)() clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, _ := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock) fakePodControl := controller.FakePodControl{Err: tc.podControlErr} metrics.JobPodsFinished.Reset() manager.podControl = &fakePodControl var statusUpdates []batch.JobStatus manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { statusUpdates = append(statusUpdates, *job.Status.DeepCopy()) return job, tc.statusUpdateErr } job := tc.job.DeepCopy() if job.Status.UncountedTerminatedPods == nil { job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } jobCtx := &syncJobCtx{ job: job, pods: tc.pods, uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods), expectedRmFinalizers: tc.expectedRmFinalizers, finishedCondition: tc.finishedCond, } if isIndexedJob(job) { jobCtx.succeededIndexes = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) if tc.enableJobBackoffLimitPerIndex && job.Spec.BackoffLimitPerIndex != nil { jobCtx.failedIndexes = calculateFailedIndexes(logger, job, tc.pods) jobCtx.activePods = controller.FilterActivePods(logger, tc.pods) jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx) } } err := manager.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, tc.needsFlush) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %v", err, tc.wantErr) } if diff := cmp.Diff(tc.wantStatusUpdates, statusUpdates, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { t.Errorf("Unexpected status updates (-want,+got):\n%s", diff) } rmFinalizers := len(fakePodControl.Patches) if rmFinalizers != tc.wantRmFinalizers { t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers) } if tc.wantErr == nil { completionMode := completionModeStr(job) v, err := metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded)) if err != nil { t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err) } if float64(tc.wantSucceededPodsMetric) != v { t.Errorf("Metric reports %.0f succeeded pods, want %d", v, tc.wantSucceededPodsMetric) } v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed)) if err != nil { t.Fatalf("Obtaining failed job_pods_finished_total: %v", err) } if float64(tc.wantFailedPodsMetric) != v { t.Errorf("Metric reports %.0f failed pods, want %d", v, tc.wantFailedPodsMetric) } } }) } } // TestSyncJobPastDeadline verifies tracking of active deadline in a single syncJob call. func TestSyncJobPastDeadline(t *testing.T) { _, ctx := ktesting.NewTestContext(t) testCases := map[string]struct { // job setup parallelism int32 completions int32 activeDeadlineSeconds int64 startTime int64 backoffLimit int32 suspend bool // pod setup activePods int succeededPods int failedPods int // expectations expectedDeletions int32 expectedActive int32 expectedSucceeded int32 expectedFailed int32 expectedCondition batch.JobConditionType expectedConditionReason string }{ "activeDeadlineSeconds less than single pod execution": { parallelism: 1, completions: 1, activeDeadlineSeconds: 10, startTime: 15, backoffLimit: 6, activePods: 1, expectedDeletions: 1, expectedFailed: 1, expectedCondition: batch.JobFailed, expectedConditionReason: batch.JobReasonDeadlineExceeded, }, "activeDeadlineSeconds bigger than single pod execution": { parallelism: 1, completions: 2, activeDeadlineSeconds: 10, startTime: 15, backoffLimit: 6, activePods: 1, succeededPods: 1, expectedDeletions: 1, expectedSucceeded: 1, expectedFailed: 1, expectedCondition: batch.JobFailed, expectedConditionReason: batch.JobReasonDeadlineExceeded, }, "activeDeadlineSeconds times-out before any pod starts": { parallelism: 1, completions: 1, activeDeadlineSeconds: 10, startTime: 10, backoffLimit: 6, expectedCondition: batch.JobFailed, expectedConditionReason: batch.JobReasonDeadlineExceeded, }, "activeDeadlineSeconds with backofflimit reach": { parallelism: 1, completions: 1, activeDeadlineSeconds: 1, startTime: 10, failedPods: 1, expectedFailed: 1, expectedCondition: batch.JobFailed, expectedConditionReason: batch.JobReasonBackoffLimitExceeded, }, "activeDeadlineSeconds is not triggered when Job is suspended": { suspend: true, parallelism: 1, completions: 2, activeDeadlineSeconds: 10, startTime: 15, backoffLimit: 6, expectedCondition: batch.JobSuspended, expectedConditionReason: "JobSuspended", }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { // job manager setup clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } // job & pods setup job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds job.Spec.Suspend = ptr.To(tc.suspend) start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0) job.Status.StartTime = &start sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0, 0) // run err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } // validate created/deleted pods if int32(len(fakePodControl.Templates)) != 0 { t.Errorf("Unexpected number of creates. Expected 0, saw %d\n", len(fakePodControl.Templates)) } if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName)) } // validate status if actual.Status.Active != tc.expectedActive { t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) } if actual.Status.Succeeded != tc.expectedSucceeded { t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) } if actual.Status.Failed != tc.expectedFailed { t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) } if actual.Status.StartTime == nil { t.Error("Missing .status.startTime") } // validate conditions if !getCondition(actual, tc.expectedCondition, v1.ConditionTrue, tc.expectedConditionReason) { t.Errorf("Expected fail condition. Got %#v", actual.Status.Conditions) } }) } } func getCondition(job *batch.Job, condition batch.JobConditionType, status v1.ConditionStatus, reason string) bool { for _, v := range job.Status.Conditions { if v.Type == condition && v.Status == status && v.Reason == reason { return true } } return false } func hasTrueCondition(job *batch.Job) *batch.JobConditionType { for _, v := range job.Status.Conditions { if v.Status == v1.ConditionTrue { return &v.Type } } return nil } // TestPastDeadlineJobFinished ensures that a Job is correctly tracked until // reaching the active deadline, at which point it is marked as Failed. func TestPastDeadlineJobFinished(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := fake.NewSimpleClientset() fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second)) manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.expectations = FakeJobExpectations{ controller.NewControllerExpectations(), true, func() { }, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() sharedInformerFactory.Start(ctx.Done()) sharedInformerFactory.WaitForCacheSync(ctx.Done()) go manager.Run(ctx, 1) tests := []struct { name string setStartTime bool jobName string }{ { name: "New job created without start time being set", setStartTime: false, jobName: "job1", }, { name: "New job created with start time being set", setStartTime: true, jobName: "job2", }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { job := newJobWithName(tc.jobName, 1, 1, 6, batch.NonIndexedCompletion) job.Spec.ActiveDeadlineSeconds = ptr.To[int64](1) if tc.setStartTime { start := metav1.NewTime(fakeClock.Now()) job.Status.StartTime = &start } _, err := clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{}) if err != nil { t.Errorf("Could not create Job: %v", err) } var j *batch.Job err = wait.PollUntilContextTimeout(ctx, 200*time.Microsecond, 3*time.Second, true, func(ctx context.Context) (done bool, err error) { j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{}) if err != nil { return false, err } return j.Status.StartTime != nil, nil }) if err != nil { t.Errorf("Job failed to ensure that start time was set: %v", err) } err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (done bool, err error) { j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{}) if err != nil { return false, nil } if getCondition(j, batch.JobFailed, v1.ConditionTrue, batch.JobReasonDeadlineExceeded) { if manager.clock.Since(j.Status.StartTime.Time) < time.Duration(*j.Spec.ActiveDeadlineSeconds)*time.Second { return true, errors.New("Job contains DeadlineExceeded condition earlier than expected") } return true, nil } manager.clock.Sleep(100 * time.Millisecond) return false, nil }) if err != nil { t.Errorf("Job failed to enforce activeDeadlineSeconds configuration. Expected condition with Reason 'DeadlineExceeded' was not found in %v", j.Status) } }) } } func TestSingleJobFailedCondition(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } job := newJob(1, 1, 6, batch.NonIndexedCompletion) job.Spec.ActiveDeadlineSeconds = ptr.To[int64](10) start := metav1.Unix(metav1.Now().Time.Unix()-15, 0) job.Status.StartTime = &start job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionFalse, "DeadlineExceeded", "Job was active longer than specified deadline", realClock.Now())) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } if len(fakePodControl.DeletePodName) != 0 { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName)) } if actual == nil { t.Error("Expected job modification\n") } failedConditions := getConditionsByType(actual.Status.Conditions, batch.JobFailed) if len(failedConditions) != 1 { t.Error("Unexpected number of failed conditions\n") } if failedConditions[0].Status != v1.ConditionTrue { t.Errorf("Unexpected status for the failed condition. Expected: %v, saw %v\n", v1.ConditionTrue, failedConditions[0].Status) } } func TestSyncJobComplete(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady job := newJob(1, 1, 6, batch.NonIndexedCompletion) job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobComplete, v1.ConditionTrue, "", "", realClock.Now())) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Fatalf("Unexpected error when syncing jobs %v", err) } actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name) if err != nil { t.Fatalf("Unexpected error when trying to get job from the store: %v", err) } // Verify that after syncing a complete job, the conditions are the same. if got, expected := len(actual.Status.Conditions), 1; got != expected { t.Fatalf("Unexpected job status conditions amount; expected %d, got %d", expected, got) } } func TestSyncJobDeleted(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, _ := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { return job, nil } job := newJob(2, 2, 6, batch.NonIndexedCompletion) err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } if len(fakePodControl.Templates) != 0 { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) } if len(fakePodControl.DeletePodName) != 0 { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName)) } } func TestSyncJobWhenManagedBy(t *testing.T) { _, ctx := ktesting.NewTestContext(t) now := metav1.Now() baseJob := batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: metav1.ObjectMeta{ Name: "foobar", Namespace: metav1.NamespaceDefault, }, Spec: batch.JobSpec{ Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "foo": "bar", }, }, Spec: v1.PodSpec{ Containers: []v1.Container{ {Image: "foo/bar"}, }, }, }, Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](6), }, Status: batch.JobStatus{ Active: 1, Ready: ptr.To[int32](1), StartTime: &now, }, } testCases := map[string]struct { enableJobManagedBy bool job batch.Job wantStatus batch.JobStatus }{ "job with custom value of managedBy; feature enabled; the status is unchanged": { enableJobManagedBy: true, job: func() batch.Job { job := baseJob.DeepCopy() job.Spec.ManagedBy = ptr.To("custom-managed-by") return *job }(), wantStatus: baseJob.Status, }, "job with well known value of the managedBy; feature enabled; the status is updated": { enableJobManagedBy: true, job: func() batch.Job { job := baseJob.DeepCopy() job.Spec.ManagedBy = ptr.To(batch.JobControllerName) return *job }(), wantStatus: batch.JobStatus{ Active: 2, Ready: ptr.To[int32](0), StartTime: &now, Terminating: ptr.To[int32](0), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, "job with custom value of managedBy; feature disabled; the status is updated": { job: func() batch.Job { job := baseJob.DeepCopy() job.Spec.ManagedBy = ptr.To("custom-managed-by") return *job }(), wantStatus: batch.JobStatus{ Active: 2, Ready: ptr.To[int32](0), StartTime: &now, Terminating: ptr.To[int32](0), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, "job without the managedBy; feature enabled; the status is updated": { enableJobManagedBy: true, job: baseJob, wantStatus: batch.JobStatus{ Active: 2, Ready: ptr.To[int32](0), StartTime: &now, Terminating: ptr.To[int32](0), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy)() clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady job := &tc.job actual := job manager.updateStatusHandler = func(_ context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil { t.Fatalf("error %v while adding the %v job to the index", err, klog.KObj(job)) } if err := manager.syncJob(ctx, testutil.GetKey(job, t)); err != nil { t.Fatalf("error %v while reconciling the job %v", err, testutil.GetKey(job, t)) } if diff := cmp.Diff(tc.wantStatus, actual.Status); diff != "" { t.Errorf("Unexpected job status (-want,+got):\n%s", diff) } }) } } func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { _, ctx := ktesting.NewTestContext(t) now := metav1.Now() indexedCompletionMode := batch.IndexedCompletion validObjectMeta := metav1.ObjectMeta{ Name: "foobar", UID: uuid.NewUUID(), Namespace: metav1.NamespaceDefault, } validSelector := &metav1.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, } validTemplate := v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "foo": "bar", }, }, Spec: v1.PodSpec{ Containers: []v1.Container{ {Image: "foo/bar"}, }, }, } onExitCodeRules := []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionIgnore, OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ Operator: batch.PodFailurePolicyOnExitCodesOpIn, Values: []int32{1, 2, 3}, }, }, { Action: batch.PodFailurePolicyActionFailJob, OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ Operator: batch.PodFailurePolicyOnExitCodesOpIn, Values: []int32{5, 6, 7}, }, }, } testCases := map[string]struct { enableJobPodFailurePolicy bool enablePodDisruptionConditions bool enableJobPodReplacementPolicy bool job batch.Job pods []v1.Pod wantConditions *[]batch.JobCondition wantStatusFailed int32 wantStatusActive int32 wantStatusSucceeded int32 wantStatusTerminating *int32 }{ "default handling for pod failure if the container matching the exit codes does not match the containerName restriction": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionIgnore, OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ ContainerName: ptr.To("main-container"), Operator: batch.PodFailurePolicyOnExitCodesOpIn, Values: []int32{1, 2, 3}, }, }, { Action: batch.PodFailurePolicyActionFailJob, OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ ContainerName: ptr.To("main-container"), Operator: batch.PodFailurePolicyOnExitCodesOpIn, Values: []int32{5, 6, 7}, }, }, }, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "monitoring-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 5, }, }, }, { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 42, FinishedAt: testFinishedAt, }, }, }, }, }, }, }, wantConditions: nil, wantStatusActive: 1, wantStatusSucceeded: 0, wantStatusFailed: 1, }, "running pod should not result in job fail based on OnExitCodes": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: onExitCodeRules, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodRunning, ContainerStatuses: []v1.ContainerStatus{ { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 5, }, }, }, }, }, }, }, wantConditions: nil, wantStatusActive: 1, wantStatusFailed: 0, wantStatusSucceeded: 0, }, "fail job based on OnExitCodes": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: onExitCodeRules, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 5, }, }, }, }, }, }, }, wantConditions: &[]batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1", }, }, wantStatusActive: 0, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "job marked already as failure target with failed pod": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: onExitCodeRules, }, }, Status: batch.JobStatus{ Conditions: []batch.JobCondition{ { Type: batch.JobFailureTarget, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1", }, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 5, }, }, }, }, }, }, }, wantConditions: &[]batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1", }, }, wantStatusActive: 0, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "job marked already as failure target with failed pod, message based on already deleted pod": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: onExitCodeRules, }, }, Status: batch.JobStatus{ Conditions: []batch.JobCondition{ { Type: batch.JobFailureTarget, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Container main-container for pod default/already-deleted-pod failed with exit code 5 matching FailJob rule at index 1", }, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 5, }, }, }, }, }, }, }, wantConditions: &[]batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Container main-container for pod default/already-deleted-pod failed with exit code 5 matching FailJob rule at index 1", }, }, wantStatusActive: 0, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "default handling for a failed pod when the feature is disabled even, despite matching rule": { enableJobPodFailurePolicy: false, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: onExitCodeRules, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 5, FinishedAt: testFinishedAt, }, }, }, }, }, }, }, wantConditions: nil, wantStatusActive: 1, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "fail job with multiple pods": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: onExitCodeRules, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodRunning, }, }, { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 5, }, }, }, }, }, }, }, wantConditions: &[]batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Container main-container for pod default/mypod-1 failed with exit code 5 matching FailJob rule at index 1", }, }, wantStatusActive: 0, wantStatusFailed: 2, wantStatusSucceeded: 0, }, "fail indexed job based on OnExitCodes": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: &indexedCompletionMode, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: onExitCodeRules, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 5, }, }, }, }, }, }, }, wantConditions: &[]batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1", }, }, wantStatusActive: 0, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "fail job based on OnExitCodes with NotIn operator": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionFailJob, OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ Operator: batch.PodFailurePolicyOnExitCodesOpNotIn, Values: []int32{5, 6, 7}, }, }, }, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 42, }, }, }, }, }, }, }, wantConditions: &[]batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Container main-container for pod default/mypod-0 failed with exit code 42 matching FailJob rule at index 0", }, }, wantStatusActive: 0, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "default handling job based on OnExitCodes with NotIn operator": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionFailJob, OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ Operator: batch.PodFailurePolicyOnExitCodesOpNotIn, Values: []int32{5, 6, 7}, }, }, }, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 5, FinishedAt: testFinishedAt, }, }, }, }, }, }, }, wantConditions: nil, wantStatusActive: 1, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "fail job based on OnExitCodes for InitContainer": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: onExitCodeRules, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, InitContainerStatuses: []v1.ContainerStatus{ { Name: "init-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 5, }, }, }, }, ContainerStatuses: []v1.ContainerStatus{ { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 143, }, }, }, }, }, }, }, wantConditions: &[]batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Container init-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1", }, }, wantStatusActive: 0, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "ignore pod failure; both rules are matching, the first is executed only": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](0), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: onExitCodeRules, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "container1", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 2, }, }, }, { Name: "container2", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 6, }, }, }, }, }, }, }, wantConditions: nil, wantStatusActive: 1, wantStatusFailed: 0, wantStatusSucceeded: 0, }, "ignore pod failure based on OnExitCodes": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](0), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: onExitCodeRules, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 1, }, }, }, }, }, }, }, wantConditions: nil, wantStatusActive: 1, wantStatusFailed: 0, wantStatusSucceeded: 0, }, "default job based on OnExitCodes": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](0), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: onExitCodeRules, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 10, }, }, }, }, }, }, }, wantConditions: &[]batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonBackoffLimitExceeded, Message: "Job has reached the specified backoff limit", }, }, wantStatusActive: 0, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "count pod failure based on OnExitCodes; both rules are matching, the first is executed only": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionCount, OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ Operator: batch.PodFailurePolicyOnExitCodesOpIn, Values: []int32{1, 2}, }, }, { Action: batch.PodFailurePolicyActionIgnore, OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ Operator: batch.PodFailurePolicyOnExitCodesOpIn, Values: []int32{2, 3}, }, }, }, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 2, FinishedAt: testFinishedAt, }, }, }, }, }, }, }, wantConditions: nil, wantStatusActive: 1, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "count pod failure based on OnPodConditions; both rules are matching, the first is executed only": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionCount, OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ { Type: v1.PodConditionType("ResourceLimitExceeded"), Status: v1.ConditionTrue, }, }, }, { Action: batch.PodFailurePolicyActionIgnore, OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ { Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }, }, }, }, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, Conditions: []v1.PodCondition{ { Type: v1.PodConditionType("ResourceLimitExceeded"), Status: v1.ConditionTrue, }, { Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }, }, ContainerStatuses: []v1.ContainerStatus{ { State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ FinishedAt: testFinishedAt, }, }, }, }, }, }, }, wantConditions: nil, wantStatusActive: 1, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "ignore pod failure based on OnPodConditions": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](0), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionIgnore, OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ { Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }, }, }, }, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, Conditions: []v1.PodCondition{ { Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }, }, }, }, }, wantConditions: nil, wantStatusActive: 1, wantStatusFailed: 0, wantStatusSucceeded: 0, }, "ignore pod failure based on OnPodConditions, ignored failures delays pod recreation": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](0), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionIgnore, OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ { Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }, }, }, }, }, }, }, pods: []v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ DeletionTimestamp: &now, }, Status: v1.PodStatus{ Phase: v1.PodFailed, Conditions: []v1.PodCondition{ { Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }, }, }, }, }, wantConditions: nil, wantStatusActive: 0, wantStatusFailed: 0, wantStatusSucceeded: 0, }, "fail job based on OnPodConditions": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](1), Completions: ptr.To[int32](1), BackoffLimit: ptr.To[int32](6), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionFailJob, OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ { Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }, }, }, }, }, }, }, pods: []v1.Pod{ { Status: v1.PodStatus{ Phase: v1.PodFailed, Conditions: []v1.PodCondition{ { Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }, }, }, }, }, wantConditions: &[]batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", }, }, wantStatusActive: 0, wantStatusFailed: 1, wantStatusSucceeded: 0, }, "terminating Pod considered failed when PodDisruptionConditions is disabled": { enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Parallelism: ptr.To[int32](1), Selector: validSelector, Template: validTemplate, BackoffLimit: ptr.To[int32](0), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionCount, OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ { Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }, }, }, }, }, }, }, pods: []v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ DeletionTimestamp: &now, }, }, }, }, "terminating Pod not considered failed when PodDisruptionConditions is enabled": { enableJobPodFailurePolicy: true, enablePodDisruptionConditions: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Parallelism: ptr.To[int32](1), Selector: validSelector, Template: validTemplate, BackoffLimit: ptr.To[int32](0), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionCount, OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ { Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }, }, }, }, }, }, }, pods: []v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ DeletionTimestamp: &now, }, Status: v1.PodStatus{ Phase: v1.PodRunning, }, }, }, }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)() if tc.job.Spec.PodReplacementPolicy == nil { tc.job.Spec.PodReplacementPolicy = podReplacementPolicy(batch.Failed) } clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady job := &tc.job actual := job manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) for i, pod := range tc.pods { pod := pod pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job) if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion { pb.index(fmt.Sprintf("%v", i)) } pb = pb.trackingFinalizer() sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod) } manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if tc.wantConditions != nil { for _, wantCondition := range *tc.wantConditions { conditions := getConditionsByType(actual.Status.Conditions, wantCondition.Type) if len(conditions) != 1 { t.Fatalf("Expected a single completion condition. Got %#v for type: %q", conditions, wantCondition.Type) } condition := *conditions[0] if diff := cmp.Diff(wantCondition, condition, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { t.Errorf("Unexpected job condition (-want,+got):\n%s", diff) } } } else { if cond := hasTrueCondition(actual); cond != nil { t.Errorf("Got condition %s, want none", *cond) } } // validate status if actual.Status.Active != tc.wantStatusActive { t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.wantStatusActive, actual.Status.Active) } if actual.Status.Succeeded != tc.wantStatusSucceeded { t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.wantStatusSucceeded, actual.Status.Succeeded) } if actual.Status.Failed != tc.wantStatusFailed { t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.wantStatusFailed, actual.Status.Failed) } if ptr.Deref(actual.Status.Terminating, 0) != ptr.Deref(tc.wantStatusTerminating, 0) { t.Errorf("unexpected number of terminating pods. Expected %d, saw %d\n", ptr.Deref(tc.wantStatusTerminating, 0), ptr.Deref(actual.Status.Terminating, 0)) } }) } } func TestSyncJobWithJobSuccessPolicy(t *testing.T) { now := time.Now() validTypeMeta := metav1.TypeMeta{ APIVersion: batch.SchemeGroupVersion.String(), Kind: "Job", } validObjectMeta := metav1.ObjectMeta{ Name: "foobar", UID: uuid.NewUUID(), Namespace: metav1.NamespaceDefault, } validSelector := &metav1.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, } validTemplate := v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "foo": "bar", }, }, Spec: v1.PodSpec{ Containers: []v1.Container{ {Image: "foobar"}, }, }, } testCases := map[string]struct { enableJobFailurePolicy bool enableBackoffLimitPerIndex bool enableJobSuccessPolicy bool job batch.Job pods []v1.Pod wantStatus batch.JobStatus }{ "job with successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": { enableJobSuccessPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](3), Completions: ptr.To[int32](3), BackoffLimit: ptr.To[int32](math.MaxInt32), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededIndexes: ptr.To("0,1"), SucceededCount: ptr.To[int32](1), }}, }, }, }, pods: []v1.Pod{ *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, *buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, { Type: batch.JobComplete, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, "job with podFailurePolicy and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": { enableJobSuccessPolicy: true, enableJobFailurePolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededIndexes: ptr.To("0,1"), SucceededCount: ptr.To[int32](1), }}, }, PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{{ Action: batch.PodFailurePolicyActionFailJob, OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{ Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }}, }}, }, }, }, pods: []v1.Pod{ *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, { Type: batch.JobComplete, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, "job with backoffLimitPerIndex and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet backoffLimitPerIndex": { enableJobSuccessPolicy: true, enableBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), BackoffLimitPerIndex: ptr.To[int32](2), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededIndexes: ptr.To("0,1"), SucceededCount: ptr.To[int32](1), }}, }, }, }, pods: []v1.Pod{ *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", FailedIndexes: ptr.To(""), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, { Type: batch.JobComplete, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, "job with successPolicy; job has both Complete and SuccessCriteriaMet condition when job meets to successPolicy and all pods have been already removed": { enableJobSuccessPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), BackoffLimitPerIndex: ptr.To[int32](2), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededIndexes: ptr.To("0,1"), SucceededCount: ptr.To[int32](1), }}, }, }, Status: batch.JobStatus{ Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, { Type: batch.JobComplete, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, // In the current mechanism, the job controller adds Complete condition to Job // even if some running pods still remain. // So, we need to revisit here before we graduate the JobSuccessPolicy to beta. // TODO(#123775): A Job might finish with ready!=0 // REF: https://github.com/kubernetes/kubernetes/issues/123775 "job with successPolicy; job has SuccessCriteriaMet and Complete condition when job meets to successPolicy and some pods still are running": { enableJobSuccessPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](3), Completions: ptr.To[int32](3), BackoffLimit: ptr.To[int32](math.MaxInt32), BackoffLimitPerIndex: ptr.To[int32](3), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededIndexes: ptr.To("0,1"), SucceededCount: ptr.To[int32](1), }}, }, }, Status: batch.JobStatus{ Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, pods: []v1.Pod{ *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, *buildPod().uid("a2").index("1").phase(v1.PodRunning).trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, *buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, { Type: batch.JobComplete, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, "job with successPolicy and podFailurePolicy; job has a failed condition when job meets to both successPolicy and podFailurePolicy": { enableJobSuccessPolicy: true, enableJobFailurePolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededIndexes: ptr.To("0,1"), SucceededCount: ptr.To[int32](1), }}, }, PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{{ Action: batch.PodFailurePolicyActionFailJob, OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{ Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }}, }}, }, }, }, pods: []v1.Pod{ *buildPod().uid("a1").index("0").status(v1.PodStatus{ Phase: v1.PodFailed, Conditions: []v1.PodCondition{{ Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }}, }).trackingFinalizer().Pod, *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 2, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobFailureTarget, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", }, { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", }, }, }, }, "job with successPolicy and backoffLimitPerIndex; job has a failed condition when job meets to both successPolicy and backoffLimitPerIndex": { enableJobSuccessPolicy: true, enableBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), BackoffLimitPerIndex: ptr.To[int32](1), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededIndexes: ptr.To("0,1"), SucceededCount: ptr.To[int32](1), }}, }, }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", FailedIndexes: ptr.To("0"), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonFailedIndexes, Message: "Job has failed indexes", }, }, }, }, "job with successPolicy and backoffLimit; job has a failed condition when job meets to both successPolicy and backoffLimit": { enableJobSuccessPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](1), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededIndexes: ptr.To("0,1"), SucceededCount: ptr.To[int32](1), }}, }, }, }, pods: []v1.Pod{ *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, *buildPod().uid("a2").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 2, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonBackoffLimitExceeded, Message: "Job has reached the specified backoff limit", }, }, }, }, "job with successPolicy and podFailurePolicy; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets podFailurePolicy": { enableJobSuccessPolicy: true, enableJobFailurePolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededCount: ptr.To[int32](1), }}, }, PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{{ Action: batch.PodFailurePolicyActionFailJob, OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{ Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }}, }}, }, }, Status: batch.JobStatus{ Failed: 0, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").status(v1.PodStatus{ Phase: v1.PodFailed, Conditions: []v1.PodCondition{{ Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }}, }).trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, { Type: batch.JobComplete, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, "job with successPolicy and backoffLimitPerIndex; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meet backoffLimitPerIndex": { enableJobSuccessPolicy: true, enableBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), BackoffLimitPerIndex: ptr.To[int32](1), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededIndexes: ptr.To("1"), }}, }, }, Status: batch.JobStatus{ Failed: 0, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", FailedIndexes: ptr.To("0"), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, { Type: batch.JobComplete, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, "job with successPolicy and backoffLimit: job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets backoffLimit": { enableJobSuccessPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](1), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededIndexes: ptr.To("0,1"), SucceededCount: ptr.To[int32](1), }}, }, }, Status: batch.JobStatus{ Failed: 0, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, { Type: batch.JobComplete, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, }, }, }, "job with successPolicy and podFailureTarget; job with FailureTarget has never been transitioned to SuccessCriteriaMet even if job meets successPolicy": { enableJobSuccessPolicy: true, enableJobFailurePolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, CompletionMode: completionModePtr(batch.IndexedCompletion), Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), SuccessPolicy: &batch.SuccessPolicy{ Rules: []batch.SuccessPolicyRule{{ SucceededIndexes: ptr.To("0,1"), SucceededCount: ptr.To[int32](1), }}, }, PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{{ Action: batch.PodFailurePolicyActionFailJob, OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{ Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }}, }}, }, }, Status: batch.JobStatus{ Failed: 1, Succeeded: 0, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobFailureTarget, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", }, }, }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").status(v1.PodStatus{ Phase: v1.PodFailed, Conditions: []v1.PodCondition{{ Type: v1.DisruptionTarget, Status: v1.ConditionTrue, }}, }).Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobFailureTarget, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", }, { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", }, }, }, }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobFailurePolicy)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)() clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(now) _, ctx := ktesting.NewTestContext(t) manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock) manager.podControl = &controller.FakePodControl{} manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady job := &tc.job actual := job manager.updateStatusHandler = func(_ context.Context, j *batch.Job) (*batch.Job, error) { actual = j return j, nil } if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil { t.Fatalf("Failed to add the Job %q to sharedInformer: %v", klog.KObj(job), err) } for i, pod := range tc.pods { pb := podBuilder{Pod: pod.DeepCopy()}.name(fmt.Sprintf("mypod-%d", i)).job(job) if isIndexedJob(job) { pb.index(strconv.Itoa(getCompletionIndex(pod.Annotations))) } if err := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod); err != nil { t.Fatalf("Failed to add the Pod %q to sharedInformer: %v", klog.KObj(pb.Pod), err) } } if err := manager.syncJob(ctx, testutil.GetKey(job, t)); err != nil { t.Fatalf("Failed to complete syncJob: %v", err) } if diff := cmp.Diff(tc.wantStatus, actual.Status, cmpopts.IgnoreFields(batch.JobStatus{}, "StartTime", "CompletionTime", "Ready"), cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { t.Errorf("Unexpectd Job status (-want,+got):\n%s", diff) } }) } } func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { _, ctx := ktesting.NewTestContext(t) now := time.Now() validObjectMeta := metav1.ObjectMeta{ Name: "foobar", UID: uuid.NewUUID(), Namespace: metav1.NamespaceDefault, } validSelector := &metav1.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, } validTemplate := v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "foo": "bar", }, }, Spec: v1.PodSpec{ Containers: []v1.Container{ {Image: "foo/bar"}, }, }, } testCases := map[string]struct { enableJobBackoffLimitPerIndex bool enableJobPodFailurePolicy bool job batch.Job pods []v1.Pod wantStatus batch.JobStatus }{ "successful job after a single failure within index": { enableJobBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), CompletionMode: completionModePtr(batch.IndexedCompletion), BackoffLimitPerIndex: ptr.To[int32](1), }, }, pods: []v1.Pod{ *buildPod().uid("a1").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, *buildPod().uid("a2").index("0").phase(v1.PodSucceeded).indexFailureCount("1").trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 2, Terminating: ptr.To[int32](0), CompletedIndexes: "0,1", FailedIndexes: ptr.To(""), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobComplete, Status: v1.ConditionTrue, }, }, }, }, "single failed pod, not counted as the replacement pod creation is delayed": { enableJobBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), CompletionMode: completionModePtr(batch.IndexedCompletion), BackoffLimitPerIndex: ptr.To[int32](1), }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Active: 2, Terminating: ptr.To[int32](0), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, FailedIndexes: ptr.To(""), }, }, "single failed pod replaced already": { enableJobBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), CompletionMode: completionModePtr(batch.IndexedCompletion), BackoffLimitPerIndex: ptr.To[int32](1), }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, *buildPod().uid("b").index("0").phase(v1.PodPending).indexFailureCount("1").trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Active: 2, Failed: 1, Terminating: ptr.To[int32](0), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, FailedIndexes: ptr.To(""), }, }, "single failed index due to exceeding the backoff limit per index, the job continues": { enableJobBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), CompletionMode: completionModePtr(batch.IndexedCompletion), BackoffLimitPerIndex: ptr.To[int32](1), }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Active: 1, Failed: 1, FailedIndexes: ptr.To("0"), Terminating: ptr.To[int32](0), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, "single failed index due to FailIndex action, the job continues": { enableJobBackoffLimitPerIndex: true, enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), CompletionMode: completionModePtr(batch.IndexedCompletion), BackoffLimitPerIndex: ptr.To[int32](1), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionFailIndex, OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ Operator: batch.PodFailurePolicyOnExitCodesOpIn, Values: []int32{3}, }, }, }, }, }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").status(v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 3, }, }, }, }, }).indexFailureCount("0").trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Active: 1, Failed: 1, FailedIndexes: ptr.To("0"), Terminating: ptr.To[int32](0), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, "job failed index due to FailJob action": { enableJobBackoffLimitPerIndex: true, enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](6), CompletionMode: completionModePtr(batch.IndexedCompletion), BackoffLimitPerIndex: ptr.To[int32](1), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionFailJob, OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ Operator: batch.PodFailurePolicyOnExitCodesOpIn, Values: []int32{3}, }, }, }, }, }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").status(v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "x", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 3, }, }, }, }, }).indexFailureCount("0").trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Active: 0, Failed: 1, FailedIndexes: ptr.To(""), Terminating: ptr.To[int32](0), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobFailureTarget, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Container x for pod default/mypod-0 failed with exit code 3 matching FailJob rule at index 0", }, { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonPodFailurePolicy, Message: "Container x for pod default/mypod-0 failed with exit code 3 matching FailJob rule at index 0", }, }, }, }, "job pod failure ignored due to matching Ignore action": { enableJobBackoffLimitPerIndex: true, enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](6), CompletionMode: completionModePtr(batch.IndexedCompletion), BackoffLimitPerIndex: ptr.To[int32](1), PodFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ { Action: batch.PodFailurePolicyActionIgnore, OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ Operator: batch.PodFailurePolicyOnExitCodesOpIn, Values: []int32{3}, }, }, }, }, }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").status(v1.PodStatus{ Phase: v1.PodFailed, ContainerStatuses: []v1.ContainerStatus{ { Name: "x", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: 3, }, }, }, }, }).indexFailureCount("0").trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Active: 2, Failed: 0, FailedIndexes: ptr.To(""), Terminating: ptr.To[int32](0), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, "job failed due to exceeding backoffLimit before backoffLimitPerIndex": { enableJobBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](1), CompletionMode: completionModePtr(batch.IndexedCompletion), BackoffLimitPerIndex: ptr.To[int32](1), }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 2, Succeeded: 0, FailedIndexes: ptr.To(""), Terminating: ptr.To[int32](0), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonBackoffLimitExceeded, Message: "Job has reached the specified backoff limit", }, }, }, }, "job failed due to failed indexes": { enableJobBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), BackoffLimit: ptr.To[int32](math.MaxInt32), CompletionMode: completionModePtr(batch.IndexedCompletion), BackoffLimitPerIndex: ptr.To[int32](1), }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, Terminating: ptr.To[int32](0), FailedIndexes: ptr.To("0"), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonFailedIndexes, Message: "Job has failed indexes", }, }, }, }, "job failed due to exceeding max failed indexes": { enableJobBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](4), Completions: ptr.To[int32](4), BackoffLimit: ptr.To[int32](math.MaxInt32), CompletionMode: completionModePtr(batch.IndexedCompletion), BackoffLimitPerIndex: ptr.To[int32](1), MaxFailedIndexes: ptr.To[int32](1), }, }, pods: []v1.Pod{ *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, *buildPod().uid("c").index("2").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, *buildPod().uid("d").index("3").phase(v1.PodRunning).indexFailureCount("0").trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Failed: 3, Succeeded: 1, Terminating: ptr.To[int32](0), FailedIndexes: ptr.To("0,2"), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { Type: batch.JobFailed, Status: v1.ConditionTrue, Reason: batch.JobReasonMaxFailedIndexesExceeded, Message: "Job has exceeded the specified maximal number of failed indexes", }, }, }, }, "job with finished indexes; failedIndexes are cleaned when JobBackoffLimitPerIndex disabled": { enableJobBackoffLimitPerIndex: false, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, Spec: batch.JobSpec{ Selector: validSelector, Template: validTemplate, Parallelism: ptr.To[int32](3), Completions: ptr.To[int32](3), BackoffLimit: ptr.To[int32](math.MaxInt32), CompletionMode: completionModePtr(batch.IndexedCompletion), BackoffLimitPerIndex: ptr.To[int32](1), }, Status: batch.JobStatus{ FailedIndexes: ptr.To("0"), CompletedIndexes: "1", }, }, pods: []v1.Pod{ *buildPod().uid("c").index("2").phase(v1.PodPending).indexFailureCount("1").trackingFinalizer().Pod, }, wantStatus: batch.JobStatus{ Active: 2, Succeeded: 1, Terminating: ptr.To[int32](0), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(now) manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady job := &tc.job actual := job manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) for i, pod := range tc.pods { pod := pod pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job) if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion { pb.index(fmt.Sprintf("%v", getCompletionIndex(pod.Annotations))) } pb = pb.trackingFinalizer() sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod) } manager.syncJob(context.TODO(), testutil.GetKey(job, t)) // validate relevant fields of the status if diff := cmp.Diff(tc.wantStatus, actual.Status, cmpopts.IgnoreFields(batch.JobStatus{}, "StartTime", "CompletionTime", "Ready"), cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { t.Errorf("unexpected job status. Diff: %s\n", diff) } }) } } func TestSyncJobUpdateRequeue(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) cases := map[string]struct { updateErr error wantRequeued bool }{ "no error": { wantRequeued: false, }, "generic error": { updateErr: fmt.Errorf("update error"), wantRequeued: true, }, "conflict error": { updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil), wantRequeued: true, }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { t.Cleanup(setDurationDuringTest(&DefaultJobApiBackOff, fastJobApiBackoff)) fakeClient := clocktesting.NewFakeClock(time.Now()) manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClient) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { return job, tc.updateErr } job := newJob(2, 2, 6, batch.NonIndexedCompletion) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) manager.queue.Add(testutil.GetKey(job, t)) manager.processNextWorkItem(context.TODO()) if tc.wantRequeued { verifyEmptyQueueAndAwaitForQueueLen(ctx, t, manager, 1) } else { // We advance the clock to make sure there are not items awaiting // to be added into the queue. We also sleep a little to give the // delaying queue time to move the potential items from pre-queue // into the queue asynchronously. manager.clock.Sleep(fastJobApiBackoff) time.Sleep(time.Millisecond) verifyEmptyQueue(ctx, t, manager) } }) } } func TestUpdateJobRequeue(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) cases := map[string]struct { oldJob *batch.Job updateFn func(job *batch.Job) wantRequeuedImmediately bool }{ "spec update": { oldJob: newJob(1, 1, 1, batch.IndexedCompletion), updateFn: func(job *batch.Job) { job.Spec.Suspend = ptr.To(false) job.Generation++ }, wantRequeuedImmediately: true, }, "status update": { oldJob: newJob(1, 1, 1, batch.IndexedCompletion), updateFn: func(job *batch.Job) { job.Status.StartTime = &metav1.Time{Time: time.Now()} }, wantRequeuedImmediately: false, }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.oldJob) newJob := tc.oldJob.DeepCopy() if tc.updateFn != nil { tc.updateFn(newJob) } manager.updateJob(logger, tc.oldJob, newJob) gotRequeuedImmediately := manager.queue.Len() > 0 if tc.wantRequeuedImmediately != gotRequeuedImmediately { t.Fatalf("Want immediate requeue: %v, got immediate requeue: %v", tc.wantRequeuedImmediately, gotRequeuedImmediately) } }) } } func TestGetPodCreationInfoForIndependentIndexes(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) now := time.Now() clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) cases := map[string]struct { indexesToAdd []int podsWithDelayedDeletionPerIndex map[int]*v1.Pod wantIndexesToAdd []int wantRemainingTime time.Duration }{ "simple index creation": { indexesToAdd: []int{1, 3}, wantIndexesToAdd: []int{1, 3}, }, "subset of indexes can be recreated now": { indexesToAdd: []int{1, 3}, podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ 1: buildPod().indexFailureCount("0").index("1").customDeletionTimestamp(now).Pod, }, wantIndexesToAdd: []int{3}, }, "subset of indexes can be recreated now as the pods failed long time ago": { indexesToAdd: []int{1, 3}, podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ 1: buildPod().indexFailureCount("0").customDeletionTimestamp(now).Pod, 3: buildPod().indexFailureCount("0").customDeletionTimestamp(now.Add(-DefaultJobPodFailureBackOff)).Pod, }, wantIndexesToAdd: []int{3}, }, "no indexes can be recreated now, need to wait default pod failure backoff": { indexesToAdd: []int{1, 2, 3}, podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ 1: buildPod().indexFailureCount("1").customDeletionTimestamp(now).Pod, 2: buildPod().indexFailureCount("0").customDeletionTimestamp(now).Pod, 3: buildPod().indexFailureCount("2").customDeletionTimestamp(now).Pod, }, wantRemainingTime: DefaultJobPodFailureBackOff, }, "no indexes can be recreated now, need to wait but 1s already passed": { indexesToAdd: []int{1, 2, 3}, podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ 1: buildPod().indexFailureCount("1").customDeletionTimestamp(now.Add(-time.Second)).Pod, 2: buildPod().indexFailureCount("0").customDeletionTimestamp(now.Add(-time.Second)).Pod, 3: buildPod().indexFailureCount("2").customDeletionTimestamp(now.Add(-time.Second)).Pod, }, wantRemainingTime: DefaultJobPodFailureBackOff - time.Second, }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { fakeClock := clocktesting.NewFakeClock(now) manager, _ := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) gotIndexesToAdd, gotRemainingTime := manager.getPodCreationInfoForIndependentIndexes(logger, tc.indexesToAdd, tc.podsWithDelayedDeletionPerIndex) if diff := cmp.Diff(tc.wantIndexesToAdd, gotIndexesToAdd); diff != "" { t.Fatalf("Unexpected indexes to add: %s", diff) } if diff := cmp.Diff(tc.wantRemainingTime, gotRemainingTime); diff != "" { t.Fatalf("Unexpected remaining time: %s", diff) } }) } } func TestJobPodLookup(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady testCases := []struct { job *batch.Job pod *v1.Pod expectedName string }{ // pods without labels don't match any job { job: &batch.Job{ ObjectMeta: metav1.ObjectMeta{Name: "basic"}, }, pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: metav1.NamespaceAll}, }, expectedName: "", }, // matching labels, different namespace { job: &batch.Job{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Spec: batch.JobSpec{ Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, }, }, }, pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "foo2", Namespace: "ns", Labels: map[string]string{"foo": "bar"}, }, }, expectedName: "", }, // matching ns and labels returns { job: &batch.Job{ ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "ns"}, Spec: batch.JobSpec{ Selector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ { Key: "foo", Operator: metav1.LabelSelectorOpIn, Values: []string{"bar"}, }, }, }, }, }, pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "foo3", Namespace: "ns", Labels: map[string]string{"foo": "bar"}, }, }, expectedName: "bar", }, } for _, tc := range testCases { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job) if jobs := manager.getPodJobs(tc.pod); len(jobs) > 0 { if got, want := len(jobs), 1; got != want { t.Errorf("len(jobs) = %v, want %v", got, want) } job := jobs[0] if tc.expectedName != job.Name { t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName) } } else if tc.expectedName != "" { t.Errorf("Expected a job %v pod %v, found none", tc.expectedName, tc.pod.Name) } } } func TestGetPodsForJob(t *testing.T) { _, ctx := ktesting.NewTestContext(t) job := newJob(1, 1, 6, batch.NonIndexedCompletion) job.Name = "test_job" otherJob := newJob(1, 1, 6, batch.NonIndexedCompletion) otherJob.Name = "other_job" cases := map[string]struct { jobDeleted bool jobDeletedInCache bool pods []*v1.Pod wantPods []string wantPodsFinalizer []string }{ "only matching": { pods: []*v1.Pod{ buildPod().name("pod1").job(job).trackingFinalizer().Pod, buildPod().name("pod2").job(otherJob).Pod, buildPod().name("pod3").ns(job.Namespace).Pod, buildPod().name("pod4").job(job).Pod, }, wantPods: []string{"pod1", "pod4"}, wantPodsFinalizer: []string{"pod1"}, }, "adopt": { pods: []*v1.Pod{ buildPod().name("pod1").job(job).Pod, buildPod().name("pod2").job(job).clearOwner().Pod, buildPod().name("pod3").job(otherJob).Pod, }, wantPods: []string{"pod1", "pod2"}, wantPodsFinalizer: []string{"pod2"}, }, "no adopt when deleting": { jobDeleted: true, jobDeletedInCache: true, pods: []*v1.Pod{ buildPod().name("pod1").job(job).Pod, buildPod().name("pod2").job(job).clearOwner().Pod, }, wantPods: []string{"pod1"}, }, "no adopt when deleting race": { jobDeleted: true, pods: []*v1.Pod{ buildPod().name("pod1").job(job).Pod, buildPod().name("pod2").job(job).clearOwner().Pod, }, wantPods: []string{"pod1"}, }, "release": { pods: []*v1.Pod{ buildPod().name("pod1").job(job).Pod, buildPod().name("pod2").job(job).clearLabels().Pod, }, wantPods: []string{"pod1"}, }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { job := job.DeepCopy() if tc.jobDeleted { job.DeletionTimestamp = &metav1.Time{} } clientSet := fake.NewSimpleClientset(job, otherJob) jm, informer := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady cachedJob := job.DeepCopy() if tc.jobDeletedInCache { cachedJob.DeletionTimestamp = &metav1.Time{} } informer.Batch().V1().Jobs().Informer().GetIndexer().Add(cachedJob) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(otherJob) for _, p := range tc.pods { informer.Core().V1().Pods().Informer().GetIndexer().Add(p) } pods, err := jm.getPodsForJob(context.TODO(), job) if err != nil { t.Fatalf("getPodsForJob() error: %v", err) } got := make([]string, len(pods)) var gotFinalizer []string for i, p := range pods { got[i] = p.Name if hasJobTrackingFinalizer(p) { gotFinalizer = append(gotFinalizer, p.Name) } } sort.Strings(got) if diff := cmp.Diff(tc.wantPods, got); diff != "" { t.Errorf("getPodsForJob() returned (-want,+got):\n%s", diff) } sort.Strings(gotFinalizer) if diff := cmp.Diff(tc.wantPodsFinalizer, gotFinalizer); diff != "" { t.Errorf("Pods with finalizers (-want,+got):\n%s", diff) } }) } } func TestAddPod(t *testing.T) { t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) _, ctx := ktesting.NewTestContext(t) logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) pod1 := newPod("pod1", job1) pod2 := newPod("pod2", job2) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) jm.addPod(logger, pod1) verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done := jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) } expectedKey, _ := controller.KeyFunc(job1) if got, want := key.(string), expectedKey; got != want { t.Errorf("queue.Get() = %v, want %v", got, want) } jm.addPod(logger, pod2) verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done = jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) } expectedKey, _ = controller.KeyFunc(job2) if got, want := key.(string), expectedKey; got != want { t.Errorf("queue.Get() = %v, want %v", got, want) } } func TestAddPodOrphan(t *testing.T) { t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" job3 := newJob(1, 1, 6, batch.NonIndexedCompletion) job3.Name = "job3" job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3) pod1 := newPod("pod1", job1) // Make pod an orphan. Expect all matching controllers to be queued. pod1.OwnerReferences = nil informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) jm.addPod(logger, pod1) verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) } func TestUpdatePod(t *testing.T) { t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) _, ctx := ktesting.NewTestContext(t) logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) pod1 := newPod("pod1", job1) pod2 := newPod("pod2", job2) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) prev := *pod1 bumpResourceVersion(pod1) jm.updatePod(logger, &prev, pod1) verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done := jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) } expectedKey, _ := controller.KeyFunc(job1) if got, want := key.(string), expectedKey; got != want { t.Errorf("queue.Get() = %v, want %v", got, want) } prev = *pod2 bumpResourceVersion(pod2) jm.updatePod(logger, &prev, pod2) verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done = jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) } expectedKey, _ = controller.KeyFunc(job2) if got, want := key.(string), expectedKey; got != want { t.Errorf("queue.Get() = %v, want %v", got, want) } } func TestUpdatePodOrphanWithNewLabels(t *testing.T) { t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) pod1 := newPod("pod1", job1) pod1.OwnerReferences = nil informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) // Labels changed on orphan. Expect newly matching controllers to queue. prev := *pod1 prev.Labels = map[string]string{"foo2": "bar2"} bumpResourceVersion(pod1) jm.updatePod(logger, &prev, pod1) verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) } func TestUpdatePodChangeControllerRef(t *testing.T) { t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) _, ctx := ktesting.NewTestContext(t) logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) pod1 := newPod("pod1", job1) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) // Changed ControllerRef. Expect both old and new to queue. prev := *pod1 prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(job2, controllerKind)} bumpResourceVersion(pod1) jm.updatePod(logger, &prev, pod1) verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) } func TestUpdatePodRelease(t *testing.T) { t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) _, ctx := ktesting.NewTestContext(t) logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) pod1 := newPod("pod1", job1) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) // Remove ControllerRef. Expect all matching to queue for adoption. prev := *pod1 pod1.OwnerReferences = nil bumpResourceVersion(pod1) jm.updatePod(logger, &prev, pod1) verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) } func TestDeletePod(t *testing.T) { t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) pod1 := newPod("pod1", job1) pod2 := newPod("pod2", job2) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) jm.deletePod(logger, pod1, true) verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done := jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) } expectedKey, _ := controller.KeyFunc(job1) if got, want := key.(string), expectedKey; got != want { t.Errorf("queue.Get() = %v, want %v", got, want) } jm.deletePod(logger, pod2, true) verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done = jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) } expectedKey, _ = controller.KeyFunc(job2) if got, want := key.(string), expectedKey; got != want { t.Errorf("queue.Get() = %v, want %v", got, want) } } func TestDeletePodOrphan(t *testing.T) { // Disable batching of pod updates to show it does not get requeued at all t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, 0)) logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) jm, informer := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" job3 := newJob(1, 1, 6, batch.NonIndexedCompletion) job3.Name = "job3" job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3) pod1 := newPod("pod1", job1) pod1.OwnerReferences = nil informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) jm.deletePod(logger, pod1, true) if got, want := jm.queue.Len(), 0; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } } type FakeJobExpectations struct { *controller.ControllerExpectations satisfied bool expSatisfied func() } func (fe FakeJobExpectations) SatisfiedExpectations(logger klog.Logger, controllerKey string) bool { fe.expSatisfied() return fe.satisfied } // TestSyncJobExpectations tests that a pod cannot sneak in between counting active pods // and checking expectations. func TestSyncJobExpectations(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { return job, nil } job := newJob(2, 2, 6, batch.NonIndexedCompletion) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) pods := newPodList(2, v1.PodPending, job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() podIndexer.Add(pods[0]) manager.expectations = FakeJobExpectations{ controller.NewControllerExpectations(), true, func() { // If we check active pods before checking expectations, the job // will create a new replica because it doesn't see this pod, but // has fulfilled its expectations. podIndexer.Add(pods[1]) }, } manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if len(fakePodControl.Templates) != 0 { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) } if len(fakePodControl.DeletePodName) != 0 { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName)) } } func TestWatchJobs(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := fake.NewSimpleClientset() fakeWatch := watch.NewFake() clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil)) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var testJob batch.Job received := make(chan struct{}) // The update sent through the fakeWatcher should make its way into the workqueue, // and eventually into the syncHandler. manager.syncHandler = func(ctx context.Context, key string) error { defer close(received) ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { t.Errorf("Error getting namespace/name from key %v: %v", key, err) } job, err := manager.jobLister.Jobs(ns).Get(name) if err != nil || job == nil { t.Errorf("Expected to find job under key %v: %v", key, err) return nil } if !apiequality.Semantic.DeepDerivative(*job, testJob) { t.Errorf("Expected %#v, but got %#v", testJob, *job) } return nil } // Start only the job watcher and the workqueue, send a watch event, // and make sure it hits the sync method. stopCh := make(chan struct{}) defer close(stopCh) sharedInformerFactory.Start(stopCh) go manager.Run(context.TODO(), 1) // We're sending new job to see if it reaches syncHandler. testJob.Namespace = "bar" testJob.Name = "foo" fakeWatch.Add(&testJob) t.Log("Waiting for job to reach syncHandler") <-received } func TestWatchPods(t *testing.T) { _, ctx := ktesting.NewTestContext(t) testJob := newJob(2, 2, 6, batch.NonIndexedCompletion) clientset := fake.NewSimpleClientset(testJob) fakeWatch := watch.NewFake() clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady // Put one job and one pod into the store sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(testJob) received := make(chan struct{}) // The pod update sent through the fakeWatcher should figure out the managing job and // send it into the syncHandler. manager.syncHandler = func(ctx context.Context, key string) error { ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { t.Errorf("Error getting namespace/name from key %v: %v", key, err) } job, err := manager.jobLister.Jobs(ns).Get(name) if err != nil { t.Errorf("Expected to find job under key %v: %v", key, err) } if !apiequality.Semantic.DeepDerivative(job, testJob) { t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job) close(received) return nil } close(received) return nil } // Start only the pod watcher and the workqueue, send a watch event, // and make sure it hits the sync method for the right job. stopCh := make(chan struct{}) defer close(stopCh) go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh) go manager.Run(context.TODO(), 1) pods := newPodList(1, v1.PodRunning, testJob) testPod := pods[0] testPod.Status.Phase = v1.PodFailed fakeWatch.Add(testPod) t.Log("Waiting for pod to reach syncHandler") <-received } func TestWatchOrphanPods(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) if err != nil { t.Fatalf("Error creating Job controller: %v", err) } manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady stopCh := make(chan struct{}) defer close(stopCh) podInformer := sharedInformers.Core().V1().Pods().Informer() go podInformer.Run(stopCh) cache.WaitForCacheSync(stopCh, podInformer.HasSynced) go manager.Run(context.TODO(), 1) // Create job but don't add it to the store. cases := map[string]struct { job *batch.Job inCache bool }{ "job_does_not_exist": { job: newJob(2, 2, 6, batch.NonIndexedCompletion), }, "orphan": {}, "job_finished": { job: func() *batch.Job { j := newJob(2, 2, 6, batch.NonIndexedCompletion) j.Status.Conditions = append(j.Status.Conditions, batch.JobCondition{ Type: batch.JobComplete, Status: v1.ConditionTrue, }) return j }(), inCache: true, }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { if tc.inCache { if err := sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job); err != nil { t.Fatalf("Failed to insert job in index: %v", err) } t.Cleanup(func() { sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Delete(tc.job) }) } podBuilder := buildPod().name(name).deletionTimestamp().trackingFinalizer() if tc.job != nil { podBuilder = podBuilder.job(tc.job) } orphanPod := podBuilder.Pod orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{}) if err != nil { t.Fatalf("Creating orphan pod: %v", err) } if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{}) if err != nil { return false, err } return !hasJobTrackingFinalizer(p), nil }); err != nil { t.Errorf("Waiting for Pod to get the finalizer removed: %v", err) } }) } } func bumpResourceVersion(obj metav1.Object) { ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32) obj.SetResourceVersion(strconv.FormatInt(ver+1, 10)) } func TestJobApiBackoffReset(t *testing.T) { t.Cleanup(setDurationDuringTest(&DefaultJobApiBackOff, fastJobApiBackoff)) _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { return job, nil } job := newJob(1, 1, 2, batch.NonIndexedCompletion) key := testutil.GetKey(job, t) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) // error returned make the key requeued fakePodControl.Err = errors.New("Controller error") manager.queue.Add(key) manager.processNextWorkItem(context.TODO()) retries := manager.queue.NumRequeues(key) if retries != 1 { t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries) } // await for the actual requeue after processing of the pending queue is done awaitForQueueLen(ctx, t, manager, 1) // the queue is emptied on success fakePodControl.Err = nil manager.processNextWorkItem(context.TODO()) verifyEmptyQueue(ctx, t, manager) } var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{} type fakeRateLimitingQueue struct { workqueue.Interface requeues int item interface{} duration time.Duration } func (f *fakeRateLimitingQueue) AddRateLimited(item interface{}) {} func (f *fakeRateLimitingQueue) Forget(item interface{}) { f.requeues = 0 } func (f *fakeRateLimitingQueue) NumRequeues(item interface{}) int { return f.requeues } func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duration) { f.item = item f.duration = duration } func TestJobBackoff(t *testing.T) { _, ctx := ktesting.NewTestContext(t) logger := klog.FromContext(ctx) job := newJob(1, 1, 1, batch.NonIndexedCompletion) oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) oldPod.ResourceVersion = "1" newPod := oldPod.DeepCopy() newPod.ResourceVersion = "2" testCases := map[string]struct { requeues int oldPodPhase v1.PodPhase phase v1.PodPhase wantBackoff time.Duration }{ "failure with pod updates batching": { requeues: 0, phase: v1.PodFailed, wantBackoff: syncJobBatchPeriod, }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady queue := &fakeRateLimitingQueue{} manager.queue = queue sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) queue.requeues = tc.requeues newPod.Status.Phase = tc.phase oldPod.Status.Phase = v1.PodRunning if tc.oldPodPhase != "" { oldPod.Status.Phase = tc.oldPodPhase } manager.updatePod(logger, oldPod, newPod) if queue.duration != tc.wantBackoff { t.Errorf("unexpected backoff %v, expected %v", queue.duration, tc.wantBackoff) } }) } } func TestJobBackoffForOnFailure(t *testing.T) { _, ctx := ktesting.NewTestContext(t) jobConditionComplete := batch.JobComplete jobConditionFailed := batch.JobFailed jobConditionSuspended := batch.JobSuspended testCases := map[string]struct { // job setup parallelism int32 completions int32 backoffLimit int32 suspend bool // pod setup restartCounts []int32 podPhase v1.PodPhase // expectations expectedActive int32 expectedSucceeded int32 expectedFailed int32 expectedCondition *batch.JobConditionType expectedConditionReason string }{ "backoffLimit 0 should have 1 pod active": { 1, 1, 0, false, []int32{0}, v1.PodRunning, 1, 0, 0, nil, "", }, "backoffLimit 1 with restartCount 0 should have 1 pod active": { 1, 1, 1, false, []int32{0}, v1.PodRunning, 1, 0, 0, nil, "", }, "backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": { 1, 1, 1, false, []int32{1}, v1.PodRunning, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": { 1, 1, 1, false, []int32{1}, v1.PodPending, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podRunning - single pod": { 1, 5, 2, false, []int32{2}, v1.PodRunning, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podPending - single pod": { 1, 5, 2, false, []int32{2}, v1.PodPending, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podRunning - multiple pods": { 2, 5, 2, false, []int32{1, 1}, v1.PodRunning, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podPending - multiple pods": { 2, 5, 2, false, []int32{1, 1}, v1.PodPending, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", }, "not enough failures": { 2, 5, 3, false, []int32{1, 1}, v1.PodRunning, 2, 0, 0, nil, "", }, "suspending a job": { 2, 4, 6, true, []int32{1, 1}, v1.PodRunning, 0, 0, 0, &jobConditionSuspended, "JobSuspended", }, "finshed job": { 2, 4, 6, true, []int32{1, 1, 2, 0}, v1.PodSucceeded, 0, 4, 0, &jobConditionComplete, "", }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } // job & pods setup job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure job.Spec.Suspend = ptr.To(tc.suspend) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() for i, pod := range newPodList(len(tc.restartCounts), tc.podPhase, job) { pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: tc.restartCounts[i]}} podIndexer.Add(pod) } // run err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("unexpected error syncing job. Got %#v", err) } // validate status if actual.Status.Active != tc.expectedActive { t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) } if actual.Status.Succeeded != tc.expectedSucceeded { t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) } if actual.Status.Failed != tc.expectedFailed { t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) } // validate conditions if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, v1.ConditionTrue, tc.expectedConditionReason) { t.Errorf("expected completion condition. Got %#v", actual.Status.Conditions) } }) } } func TestJobBackoffOnRestartPolicyNever(t *testing.T) { _, ctx := ktesting.NewTestContext(t) jobConditionFailed := batch.JobFailed testCases := map[string]struct { // job setup parallelism int32 completions int32 backoffLimit int32 // pod setup activePodsPhase v1.PodPhase activePods int failedPods int // expectations expectedActive int32 expectedSucceeded int32 expectedFailed int32 expectedCondition *batch.JobConditionType expectedConditionReason string }{ "not enough failures with backoffLimit 0 - single pod": { 1, 1, 0, v1.PodRunning, 1, 0, 1, 0, 0, nil, "", }, "not enough failures with backoffLimit 1 - single pod": { 1, 1, 1, "", 0, 1, 1, 0, 1, nil, "", }, "too many failures with backoffLimit 1 - single pod": { 1, 1, 1, "", 0, 2, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", }, "not enough failures with backoffLimit 6 - multiple pods": { 2, 2, 6, v1.PodRunning, 1, 6, 2, 0, 6, nil, "", }, "too many failures with backoffLimit 6 - multiple pods": { 2, 2, 6, "", 0, 7, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded", }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } // job & pods setup job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyNever sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() for _, pod := range newPodList(tc.failedPods, v1.PodFailed, job) { pod.Status.ContainerStatuses = []v1.ContainerStatus{{State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ FinishedAt: testFinishedAt, }}}} podIndexer.Add(pod) } for _, pod := range newPodList(tc.activePods, tc.activePodsPhase, job) { podIndexer.Add(pod) } // run err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Fatalf("unexpected error syncing job: %#v\n", err) } // validate status if actual.Status.Active != tc.expectedActive { t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) } if actual.Status.Succeeded != tc.expectedSucceeded { t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) } if actual.Status.Failed != tc.expectedFailed { t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) } // validate conditions if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, v1.ConditionTrue, tc.expectedConditionReason) { t.Errorf("expected completion condition. Got %#v", actual.Status.Conditions) } }) } } func TestEnsureJobConditions(t *testing.T) { testCases := []struct { name string haveList []batch.JobCondition wantType batch.JobConditionType wantStatus v1.ConditionStatus wantReason string expectList []batch.JobCondition expectUpdate bool }{ { name: "append true condition", haveList: []batch.JobCondition{}, wantType: batch.JobSuspended, wantStatus: v1.ConditionTrue, wantReason: "foo", expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, expectUpdate: true, }, { name: "append false condition", haveList: []batch.JobCondition{}, wantType: batch.JobSuspended, wantStatus: v1.ConditionFalse, wantReason: "foo", expectList: []batch.JobCondition{}, expectUpdate: false, }, { name: "update true condition reason", haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, wantType: batch.JobSuspended, wantStatus: v1.ConditionTrue, wantReason: "bar", expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "bar", "", realClock.Now())}, expectUpdate: true, }, { name: "update true condition status", haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, wantType: batch.JobSuspended, wantStatus: v1.ConditionFalse, wantReason: "foo", expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionFalse, "foo", "", realClock.Now())}, expectUpdate: true, }, { name: "update false condition status", haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionFalse, "foo", "", realClock.Now())}, wantType: batch.JobSuspended, wantStatus: v1.ConditionTrue, wantReason: "foo", expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, expectUpdate: true, }, { name: "condition already exists", haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, wantType: batch.JobSuspended, wantStatus: v1.ConditionTrue, wantReason: "foo", expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, expectUpdate: false, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { gotList, isUpdated := ensureJobConditionStatus(tc.haveList, tc.wantType, tc.wantStatus, tc.wantReason, "", realClock.Now()) if isUpdated != tc.expectUpdate { t.Errorf("Got isUpdated=%v, want %v", isUpdated, tc.expectUpdate) } if len(gotList) != len(tc.expectList) { t.Errorf("got a list of length %d, want %d", len(gotList), len(tc.expectList)) } if diff := cmp.Diff(tc.expectList, gotList, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { t.Errorf("Unexpected JobCondition list: (-want,+got):\n%s", diff) } }) } } func TestFinalizersRemovedExpectations(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) if err != nil { t.Fatalf("Error creating Job controller: %v", err) } manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")} manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { return job, nil } job := newJob(2, 2, 6, batch.NonIndexedCompletion) sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job) pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...) podInformer := sharedInformers.Core().V1().Pods().Informer() podIndexer := podInformer.GetIndexer() uids := sets.New[string]() for i := range pods { clientset.Tracker().Add(pods[i]) podIndexer.Add(pods[i]) uids.Insert(string(pods[i].UID)) } jobKey := testutil.GetKey(job, t) manager.syncJob(context.TODO(), jobKey) gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey) if len(gotExpectedUIDs) != 0 { t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", sets.List(gotExpectedUIDs)) } // Remove failures and re-sync. manager.podControl.(*controller.FakePodControl).Err = nil manager.syncJob(context.TODO(), jobKey) gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" { t.Errorf("Different expectations for removed finalizers after syncJob (-want,+got):\n%s", diff) } stopCh := make(chan struct{}) defer close(stopCh) go sharedInformers.Core().V1().Pods().Informer().Run(stopCh) cache.WaitForCacheSync(stopCh, podInformer.HasSynced) // Make sure the first syncJob sets the expectations, even after the caches synced. gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" { t.Errorf("Different expectations for removed finalizers after syncJob and cacheSync (-want,+got):\n%s", diff) } // Change pods in different ways. podsResource := schema.GroupVersionResource{Version: "v1", Resource: "pods"} update := pods[0].DeepCopy() update.Finalizers = nil update.ResourceVersion = "1" err = clientset.Tracker().Update(podsResource, update, update.Namespace) if err != nil { t.Errorf("Removing finalizer: %v", err) } update = pods[1].DeepCopy() update.Finalizers = nil update.DeletionTimestamp = &metav1.Time{Time: time.Now()} update.ResourceVersion = "1" err = clientset.Tracker().Update(podsResource, update, update.Namespace) if err != nil { t.Errorf("Removing finalizer and setting deletion timestamp: %v", err) } // Preserve the finalizer. update = pods[2].DeepCopy() update.DeletionTimestamp = &metav1.Time{Time: time.Now()} update.ResourceVersion = "1" err = clientset.Tracker().Update(podsResource, update, update.Namespace) if err != nil { t.Errorf("Setting deletion timestamp: %v", err) } err = clientset.Tracker().Delete(podsResource, pods[3].Namespace, pods[3].Name) if err != nil { t.Errorf("Deleting pod that had finalizer: %v", err) } uids = sets.New(string(pods[2].UID)) var diff string if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) diff = cmp.Diff(uids, gotExpectedUIDs) return diff == "", nil }); err != nil { t.Errorf("Timeout waiting for expectations (-want, +got):\n%s", diff) } } func TestFinalizerCleanup(t *testing.T) { _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() clientset := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) if err != nil { t.Fatalf("Error creating Job controller: %v", err) } manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady // Start the Pod and Job informers. sharedInformers.Start(ctx.Done()) sharedInformers.WaitForCacheSync(ctx.Done()) // Initialize the controller with 0 workers to make sure the // pod finalizers are not removed by the "syncJob" function. go manager.Run(ctx, 0) // Create a simple Job job := newJob(1, 1, 1, batch.NonIndexedCompletion) job, err = clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{}) if err != nil { t.Fatalf("Creating job: %v", err) } // Await for the Job to appear in the jobLister to ensure so that Job Pod gets tracked correctly. if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { job, _ := manager.jobLister.Jobs(job.GetNamespace()).Get(job.Name) return job != nil, nil }); err != nil { t.Fatalf("Waiting for Job object to appear in jobLister: %v", err) } // Create a Pod with the job tracking finalizer pod := newPod("test-pod", job) pod.Finalizers = append(pod.Finalizers, batch.JobTrackingFinalizer) pod, err = clientset.CoreV1().Pods(pod.GetNamespace()).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { t.Fatalf("Creating pod: %v", err) } // Await for the Pod to appear in the podStore to ensure that the pod exists when cleaning up the Job. // In a production environment, there wouldn't be these guarantees, but the Pod would be cleaned up // by the orphan pod worker, when the Pod finishes. if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { pod, _ := manager.podStore.Pods(pod.GetNamespace()).Get(pod.Name) return pod != nil, nil }); err != nil { t.Fatalf("Waiting for Pod to appear in podLister: %v", err) } // Mark Job as complete. job.Status.Conditions = append(job.Status.Conditions, batch.JobCondition{ Type: batch.JobComplete, Status: v1.ConditionTrue, }) _, err = clientset.BatchV1().Jobs(job.GetNamespace()).UpdateStatus(ctx, job, metav1.UpdateOptions{}) if err != nil { t.Fatalf("Updating job status: %v", err) } // Verify the pod finalizer is removed for a finished Job, // even if the jobs pods are not tracked by the main reconciliation loop. if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { p, err := clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) if err != nil { return false, err } return !hasJobTrackingFinalizer(p), nil }); err != nil { t.Errorf("Waiting for Pod to get the finalizer removed: %v", err) } } func checkJobCompletionLabel(t *testing.T, p *v1.PodTemplateSpec) { t.Helper() labels := p.GetLabels() if labels == nil || labels[batch.JobCompletionIndexAnnotation] == "" { t.Errorf("missing expected pod label %s", batch.JobCompletionIndexAnnotation) } } func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec, podIndexLabelDisabled bool) { t.Helper() var fieldPath string if podIndexLabelDisabled { fieldPath = fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation) } else { fieldPath = fmt.Sprintf("metadata.labels['%s']", batch.JobCompletionIndexAnnotation) } want := []v1.EnvVar{ { Name: "JOB_COMPLETION_INDEX", ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{ FieldPath: fieldPath, }, }, }, } for _, c := range spec.InitContainers { if diff := cmp.Diff(want, c.Env); diff != "" { t.Errorf("Unexpected Env in container %s (-want,+got):\n%s", c.Name, diff) } } for _, c := range spec.Containers { if diff := cmp.Diff(want, c.Env); diff != "" { t.Errorf("Unexpected Env in container %s (-want,+got):\n%s", c.Name, diff) } } } func podReplacementPolicy(m batch.PodReplacementPolicy) *batch.PodReplacementPolicy { return &m } func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) { t.Helper() verifyEmptyQueue(ctx, t, jm) awaitForQueueLen(ctx, t, jm, wantQueueLen) } func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) { t.Helper() verifyEmptyQueue(ctx, t, jm) if err := wait.PollUntilContextTimeout(ctx, fastRequeue, time.Second, true, func(ctx context.Context) (bool, error) { if requeued := jm.queue.Len() == wantQueueLen; requeued { return true, nil } jm.clock.Sleep(fastRequeue) return false, nil }); err != nil { t.Errorf("Failed to await for expected queue.Len(). want %v, got: %v", wantQueueLen, jm.queue.Len()) } } func verifyEmptyQueue(ctx context.Context, t *testing.T, jm *Controller) { t.Helper() if jm.queue.Len() > 0 { t.Errorf("Unexpected queue.Len(). Want: %d, got: %d", 0, jm.queue.Len()) } } type podBuilder struct { *v1.Pod } func buildPod() podBuilder { return podBuilder{Pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ UID: types.UID(rand.String(5)), }, }} } func getConditionsByType(list []batch.JobCondition, cType batch.JobConditionType) []*batch.JobCondition { var result []*batch.JobCondition for i := range list { if list[i].Type == cType { result = append(result, &list[i]) } } return result } func (pb podBuilder) name(n string) podBuilder { pb.Name = n return pb } func (pb podBuilder) ns(n string) podBuilder { pb.Namespace = n return pb } func (pb podBuilder) uid(u string) podBuilder { pb.UID = types.UID(u) return pb } func (pb podBuilder) job(j *batch.Job) podBuilder { pb.Labels = j.Spec.Selector.MatchLabels pb.Namespace = j.Namespace pb.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(j, controllerKind)} return pb } func (pb podBuilder) clearOwner() podBuilder { pb.OwnerReferences = nil return pb } func (pb podBuilder) clearLabels() podBuilder { pb.Labels = nil return pb } func (pb podBuilder) index(ix string) podBuilder { return pb.annotation(batch.JobCompletionIndexAnnotation, ix) } func (pb podBuilder) indexFailureCount(count string) podBuilder { return pb.annotation(batch.JobIndexFailureCountAnnotation, count) } func (pb podBuilder) indexIgnoredFailureCount(count string) podBuilder { return pb.annotation(batch.JobIndexIgnoredFailureCountAnnotation, count) } func (pb podBuilder) annotation(key, value string) podBuilder { if pb.Annotations == nil { pb.Annotations = make(map[string]string) } pb.Annotations[key] = value return pb } func (pb podBuilder) status(s v1.PodStatus) podBuilder { pb.Status = s return pb } func (pb podBuilder) phase(p v1.PodPhase) podBuilder { pb.Status.Phase = p return pb } func (pb podBuilder) trackingFinalizer() podBuilder { for _, f := range pb.Finalizers { if f == batch.JobTrackingFinalizer { return pb } } pb.Finalizers = append(pb.Finalizers, batch.JobTrackingFinalizer) return pb } func (pb podBuilder) deletionTimestamp() podBuilder { pb.DeletionTimestamp = &metav1.Time{} return pb } func (pb podBuilder) customDeletionTimestamp(t time.Time) podBuilder { pb.DeletionTimestamp = &metav1.Time{Time: t} return pb } func completionModePtr(m batch.CompletionMode) *batch.CompletionMode { return &m } func setDurationDuringTest(val *time.Duration, newVal time.Duration) func() { origVal := *val *val = newVal return func() { *val = origVal } }