1
16
17 package job
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "sort"
24 "sync"
25 "sync/atomic"
26 "time"
27
28 batch "k8s.io/api/batch/v1"
29 v1 "k8s.io/api/core/v1"
30 apierrors "k8s.io/apimachinery/pkg/api/errors"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/labels"
33 "k8s.io/apimachinery/pkg/types"
34 "k8s.io/apimachinery/pkg/util/json"
35 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
36 "k8s.io/apimachinery/pkg/util/sets"
37 "k8s.io/apimachinery/pkg/util/wait"
38 "k8s.io/apiserver/pkg/util/feature"
39 batchinformers "k8s.io/client-go/informers/batch/v1"
40 coreinformers "k8s.io/client-go/informers/core/v1"
41 clientset "k8s.io/client-go/kubernetes"
42 "k8s.io/client-go/kubernetes/scheme"
43 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
44 batchv1listers "k8s.io/client-go/listers/batch/v1"
45 corelisters "k8s.io/client-go/listers/core/v1"
46 "k8s.io/client-go/tools/cache"
47 "k8s.io/client-go/tools/record"
48 "k8s.io/client-go/util/workqueue"
49 "k8s.io/klog/v2"
50 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
51 "k8s.io/kubernetes/pkg/controller"
52 "k8s.io/kubernetes/pkg/controller/job/metrics"
53 "k8s.io/kubernetes/pkg/features"
54 "k8s.io/utils/clock"
55 "k8s.io/utils/ptr"
56 )
57
58
59 var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
60
61 var (
62
63 syncJobBatchPeriod = time.Second
64
65 DefaultJobApiBackOff = time.Second
66
67 MaxJobApiBackOff = time.Minute
68
69 DefaultJobPodFailureBackOff = 10 * time.Second
70
71 MaxJobPodFailureBackOff = 10 * time.Minute
72
73
74
75 MaxUncountedPods = 500
76
77
78 MaxPodCreateDeletePerSync = 500
79 )
80
81
82
83 type Controller struct {
84 kubeClient clientset.Interface
85 podControl controller.PodControlInterface
86
87
88 updateStatusHandler func(ctx context.Context, job *batch.Job) (*batch.Job, error)
89 patchJobHandler func(ctx context.Context, job *batch.Job, patch []byte) error
90 syncHandler func(ctx context.Context, jobKey string) error
91
92
93 podStoreSynced cache.InformerSynced
94
95
96 jobStoreSynced cache.InformerSynced
97
98
99 expectations controller.ControllerExpectationsInterface
100
101
102
103 finalizerExpectations *uidTrackingExpectations
104
105
106 jobLister batchv1listers.JobLister
107
108
109 podStore corelisters.PodLister
110
111
112 queue workqueue.RateLimitingInterface
113
114
115 orphanQueue workqueue.RateLimitingInterface
116
117 broadcaster record.EventBroadcaster
118 recorder record.EventRecorder
119
120 clock clock.WithTicker
121
122
123
124 podBackoffStore *backoffStore
125 }
126
127 type syncJobCtx struct {
128 job *batch.Job
129 pods []*v1.Pod
130 finishedCondition *batch.JobCondition
131 activePods []*v1.Pod
132 succeeded int32
133 failed int32
134 prevSucceededIndexes orderedIntervals
135 succeededIndexes orderedIntervals
136 failedIndexes *orderedIntervals
137 newBackoffRecord backoffRecord
138 expectedRmFinalizers sets.Set[string]
139 uncounted *uncountedTerminatedPods
140 podsWithDelayedDeletionPerIndex map[int]*v1.Pod
141 terminating *int32
142 }
143
144
145
146 func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) (*Controller, error) {
147 return newControllerWithClock(ctx, podInformer, jobInformer, kubeClient, &clock.RealClock{})
148 }
149
150 func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) (*Controller, error) {
151 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
152 logger := klog.FromContext(ctx)
153
154 jm := &Controller{
155 kubeClient: kubeClient,
156 podControl: controller.RealPodControl{
157 KubeClient: kubeClient,
158 Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
159 },
160 expectations: controller.NewControllerExpectations(),
161 finalizerExpectations: newUIDTrackingExpectations(),
162 queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.RateLimitingQueueConfig{Name: "job", Clock: clock}),
163 orphanQueue: workqueue.NewRateLimitingQueueWithConfig(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.RateLimitingQueueConfig{Name: "job_orphan_pod", Clock: clock}),
164 broadcaster: eventBroadcaster,
165 recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
166 clock: clock,
167 podBackoffStore: newBackoffStore(),
168 }
169
170 if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
171 AddFunc: func(obj interface{}) {
172 jm.addJob(logger, obj)
173 },
174 UpdateFunc: func(oldObj, newObj interface{}) {
175 jm.updateJob(logger, oldObj, newObj)
176 },
177 DeleteFunc: func(obj interface{}) {
178 jm.deleteJob(logger, obj)
179 },
180 }); err != nil {
181 return nil, fmt.Errorf("adding Job event handler: %w", err)
182 }
183 jm.jobLister = jobInformer.Lister()
184 jm.jobStoreSynced = jobInformer.Informer().HasSynced
185
186 if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
187 AddFunc: func(obj interface{}) {
188 jm.addPod(logger, obj)
189 },
190 UpdateFunc: func(oldObj, newObj interface{}) {
191 jm.updatePod(logger, oldObj, newObj)
192 },
193 DeleteFunc: func(obj interface{}) {
194 jm.deletePod(logger, obj, true)
195 },
196 }); err != nil {
197 return nil, fmt.Errorf("adding Pod event handler: %w", err)
198 }
199 jm.podStore = podInformer.Lister()
200 jm.podStoreSynced = podInformer.Informer().HasSynced
201
202 jm.updateStatusHandler = jm.updateJobStatus
203 jm.patchJobHandler = jm.patchJob
204 jm.syncHandler = jm.syncJob
205
206 metrics.Register()
207
208 return jm, nil
209 }
210
211
212 func (jm *Controller) Run(ctx context.Context, workers int) {
213 defer utilruntime.HandleCrash()
214 logger := klog.FromContext(ctx)
215
216
217 jm.broadcaster.StartStructuredLogging(3)
218 jm.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")})
219 defer jm.broadcaster.Shutdown()
220
221 defer jm.queue.ShutDown()
222 defer jm.orphanQueue.ShutDown()
223
224 logger.Info("Starting job controller")
225 defer logger.Info("Shutting down job controller")
226
227 if !cache.WaitForNamedCacheSync("job", ctx.Done(), jm.podStoreSynced, jm.jobStoreSynced) {
228 return
229 }
230
231 for i := 0; i < workers; i++ {
232 go wait.UntilWithContext(ctx, jm.worker, time.Second)
233 }
234
235 go wait.UntilWithContext(ctx, jm.orphanWorker, time.Second)
236
237 <-ctx.Done()
238 }
239
240
241 func (jm *Controller) getPodJobs(pod *v1.Pod) []*batch.Job {
242 jobs, err := jm.jobLister.GetPodJobs(pod)
243 if err != nil {
244 return nil
245 }
246 if len(jobs) > 1 {
247
248
249 utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels))
250 }
251 ret := make([]*batch.Job, 0, len(jobs))
252 for i := range jobs {
253 ret = append(ret, &jobs[i])
254 }
255 return ret
256 }
257
258
259
260
261 func (jm *Controller) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batch.Job {
262
263
264 if controllerRef.Kind != controllerKind.Kind {
265 return nil
266 }
267 job, err := jm.jobLister.Jobs(namespace).Get(controllerRef.Name)
268 if err != nil {
269 return nil
270 }
271 if job.UID != controllerRef.UID {
272
273
274 return nil
275 }
276 return job
277 }
278
279
280 func (jm *Controller) addPod(logger klog.Logger, obj interface{}) {
281 pod := obj.(*v1.Pod)
282 recordFinishedPodWithTrackingFinalizer(nil, pod)
283 if pod.DeletionTimestamp != nil {
284
285
286 jm.deletePod(logger, pod, false)
287 return
288 }
289
290
291 if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
292 job := jm.resolveControllerRef(pod.Namespace, controllerRef)
293 if job == nil {
294 return
295 }
296 jobKey, err := controller.KeyFunc(job)
297 if err != nil {
298 return
299 }
300 jm.expectations.CreationObserved(logger, jobKey)
301 jm.enqueueSyncJobBatched(logger, job)
302 return
303 }
304
305
306
307 if hasJobTrackingFinalizer(pod) {
308 jm.enqueueOrphanPod(pod)
309 }
310
311
312
313
314 for _, job := range jm.getPodJobs(pod) {
315 jm.enqueueSyncJobBatched(logger, job)
316 }
317 }
318
319
320
321
322 func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
323 curPod := cur.(*v1.Pod)
324 oldPod := old.(*v1.Pod)
325 recordFinishedPodWithTrackingFinalizer(oldPod, curPod)
326 if curPod.ResourceVersion == oldPod.ResourceVersion {
327
328
329 return
330 }
331 if curPod.DeletionTimestamp != nil {
332
333
334
335
336 jm.deletePod(logger, curPod, false)
337 return
338 }
339
340
341
342
343
344
345 finalizerRemoved := !hasJobTrackingFinalizer(curPod)
346 curControllerRef := metav1.GetControllerOf(curPod)
347 oldControllerRef := metav1.GetControllerOf(oldPod)
348 controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
349 if controllerRefChanged && oldControllerRef != nil {
350
351 if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
352 if finalizerRemoved {
353 key, err := controller.KeyFunc(job)
354 if err == nil {
355 jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
356 }
357 }
358 jm.enqueueSyncJobBatched(logger, job)
359 }
360 }
361
362
363 if curControllerRef != nil {
364 job := jm.resolveControllerRef(curPod.Namespace, curControllerRef)
365 if job == nil {
366 return
367 }
368 if finalizerRemoved {
369 key, err := controller.KeyFunc(job)
370 if err == nil {
371 jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
372 }
373 }
374 jm.enqueueSyncJobBatched(logger, job)
375 return
376 }
377
378
379
380 if hasJobTrackingFinalizer(curPod) {
381 jm.enqueueOrphanPod(curPod)
382 }
383
384
385 labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
386 if labelChanged || controllerRefChanged {
387 for _, job := range jm.getPodJobs(curPod) {
388 jm.enqueueSyncJobBatched(logger, job)
389 }
390 }
391 }
392
393
394
395 func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool) {
396 pod, ok := obj.(*v1.Pod)
397 if final {
398 recordFinishedPodWithTrackingFinalizer(pod, nil)
399 }
400
401
402
403
404
405 if !ok {
406 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
407 if !ok {
408 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
409 return
410 }
411 pod, ok = tombstone.Obj.(*v1.Pod)
412 if !ok {
413 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
414 return
415 }
416 }
417
418 controllerRef := metav1.GetControllerOf(pod)
419 hasFinalizer := hasJobTrackingFinalizer(pod)
420 if controllerRef == nil {
421
422
423 if hasFinalizer {
424 jm.enqueueOrphanPod(pod)
425 }
426 return
427 }
428 job := jm.resolveControllerRef(pod.Namespace, controllerRef)
429 if job == nil || IsJobFinished(job) {
430
431 if hasFinalizer {
432 jm.enqueueOrphanPod(pod)
433 }
434 return
435 }
436 jobKey, err := controller.KeyFunc(job)
437 if err != nil {
438 return
439 }
440 jm.expectations.DeletionObserved(logger, jobKey)
441
442
443
444 if final || !hasFinalizer {
445 jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
446 }
447
448 jm.enqueueSyncJobBatched(logger, job)
449 }
450
451 func (jm *Controller) addJob(logger klog.Logger, obj interface{}) {
452 jm.enqueueSyncJobImmediately(logger, obj)
453 jobObj, ok := obj.(*batch.Job)
454 if !ok {
455 return
456 }
457 if controllerName := managedByExternalController(jobObj); controllerName != nil {
458 metrics.JobByExternalControllerTotal.WithLabelValues(*controllerName).Inc()
459 }
460 }
461
462 func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
463 oldJob := old.(*batch.Job)
464 curJob := cur.(*batch.Job)
465
466
467 key, err := controller.KeyFunc(curJob)
468 if err != nil {
469 return
470 }
471
472 if curJob.Generation == oldJob.Generation {
473
474
475 jm.enqueueSyncJobBatched(logger, curJob)
476 } else {
477
478 jm.enqueueSyncJobImmediately(logger, curJob)
479 }
480
481
482
483 if IsJobFinished(curJob) {
484 jm.cleanupPodFinalizers(curJob)
485 }
486
487
488 if curJob.Status.StartTime != nil {
489 curADS := curJob.Spec.ActiveDeadlineSeconds
490 if curADS == nil {
491 return
492 }
493 oldADS := oldJob.Spec.ActiveDeadlineSeconds
494 if oldADS == nil || *oldADS != *curADS {
495 passed := jm.clock.Since(curJob.Status.StartTime.Time)
496 total := time.Duration(*curADS) * time.Second
497
498 jm.queue.AddAfter(key, total-passed)
499 logger.V(4).Info("job's ActiveDeadlineSeconds updated, will rsync", "key", key, "interval", total-passed)
500 }
501 }
502 }
503
504
505
506 func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
507 jm.enqueueSyncJobImmediately(logger, obj)
508 jobObj, ok := obj.(*batch.Job)
509 if !ok {
510 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
511 if !ok {
512 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
513 return
514 }
515 jobObj, ok = tombstone.Obj.(*batch.Job)
516 if !ok {
517 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a job %+v", obj))
518 return
519 }
520 }
521 jm.cleanupPodFinalizers(jobObj)
522 }
523
524
525
526
527
528 func (jm *Controller) enqueueSyncJobImmediately(logger klog.Logger, obj interface{}) {
529 jm.enqueueSyncJobInternal(logger, obj, 0)
530 }
531
532
533
534
535
536
537
538 func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}) {
539 jm.enqueueSyncJobInternal(logger, obj, syncJobBatchPeriod)
540 }
541
542
543
544
545
546 func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{}, delay time.Duration) {
547 if delay < syncJobBatchPeriod {
548 delay = syncJobBatchPeriod
549 }
550 jm.enqueueSyncJobInternal(logger, obj, delay)
551 }
552
553 func (jm *Controller) enqueueSyncJobInternal(logger klog.Logger, obj interface{}, delay time.Duration) {
554 key, err := controller.KeyFunc(obj)
555 if err != nil {
556 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
557 return
558 }
559
560
561
562
563
564
565
566 logger.Info("enqueueing job", "key", key)
567 jm.queue.AddAfter(key, delay)
568 }
569
570 func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) {
571 key, err := controller.KeyFunc(obj)
572 if err != nil {
573 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
574 return
575 }
576 jm.orphanQueue.Add(key)
577 }
578
579
580
581 func (jm *Controller) worker(ctx context.Context) {
582 for jm.processNextWorkItem(ctx) {
583 }
584 }
585
586 func (jm *Controller) processNextWorkItem(ctx context.Context) bool {
587 key, quit := jm.queue.Get()
588 if quit {
589 return false
590 }
591 defer jm.queue.Done(key)
592
593 err := jm.syncHandler(ctx, key.(string))
594 if err == nil {
595 jm.queue.Forget(key)
596 return true
597 }
598
599 utilruntime.HandleError(fmt.Errorf("syncing job: %w", err))
600 jm.queue.AddRateLimited(key)
601
602 return true
603 }
604
605 func (jm *Controller) orphanWorker(ctx context.Context) {
606 for jm.processNextOrphanPod(ctx) {
607 }
608 }
609
610 func (jm *Controller) processNextOrphanPod(ctx context.Context) bool {
611 key, quit := jm.orphanQueue.Get()
612 if quit {
613 return false
614 }
615 defer jm.orphanQueue.Done(key)
616 err := jm.syncOrphanPod(ctx, key.(string))
617 if err != nil {
618 utilruntime.HandleError(fmt.Errorf("Error syncing orphan pod: %v", err))
619 jm.orphanQueue.AddRateLimited(key)
620 } else {
621 jm.orphanQueue.Forget(key)
622 }
623
624 return true
625 }
626
627
628 func (jm *Controller) syncOrphanPod(ctx context.Context, key string) error {
629 startTime := jm.clock.Now()
630 logger := klog.FromContext(ctx)
631 defer func() {
632 logger.V(4).Info("Finished syncing orphan pod", "pod", key, "elapsed", jm.clock.Since(startTime))
633 }()
634
635 ns, name, err := cache.SplitMetaNamespaceKey(key)
636 if err != nil {
637 return err
638 }
639
640 sharedPod, err := jm.podStore.Pods(ns).Get(name)
641 if err != nil {
642 if apierrors.IsNotFound(err) {
643 logger.V(4).Info("Orphan pod has been deleted", "pod", key)
644 return nil
645 }
646 return err
647 }
648
649 if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil {
650 job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef)
651 if job != nil {
652
653 if controllerName := managedByExternalController(job); controllerName != nil {
654 logger.V(2).Info("Skip cleanup of the job finalizer for a pod owned by a job that is managed by an external controller", "key", key, "podUID", sharedPod.UID, "jobUID", job.UID, "controllerName", controllerName)
655 return nil
656 }
657 }
658 if job != nil && !IsJobFinished(job) {
659
660 return nil
661 }
662 }
663 if patch := removeTrackingFinalizerPatch(sharedPod); patch != nil {
664 if err := jm.podControl.PatchPod(ctx, ns, name, patch); err != nil && !apierrors.IsNotFound(err) {
665 return err
666 }
667 }
668 return nil
669 }
670
671
672
673
674
675 func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Pod, error) {
676 selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
677 if err != nil {
678 return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
679 }
680
681
682 pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything())
683 if err != nil {
684 return nil, err
685 }
686
687
688 canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
689 fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
690 if err != nil {
691 return nil, err
692 }
693 if fresh.UID != j.UID {
694 return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", j.Namespace, j.Name, fresh.UID, j.UID)
695 }
696 return fresh, nil
697 })
698 cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, batch.JobTrackingFinalizer)
699
700 pods, err = cm.ClaimPods(ctx, pods)
701 if err != nil {
702 return pods, err
703 }
704
705 for i, p := range pods {
706 adopted := true
707 for _, r := range p.OwnerReferences {
708 if r.UID == j.UID {
709 adopted = false
710 break
711 }
712 }
713 if adopted && !hasJobTrackingFinalizer(p) {
714 pods[i] = p.DeepCopy()
715 pods[i].Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
716 }
717 }
718 return pods, err
719 }
720
721
722
723
724 func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
725 startTime := jm.clock.Now()
726 logger := klog.FromContext(ctx)
727 defer func() {
728 logger.V(4).Info("Finished syncing job", "key", key, "elapsed", jm.clock.Since(startTime))
729 }()
730
731 ns, name, err := cache.SplitMetaNamespaceKey(key)
732 if err != nil {
733 return err
734 }
735 if len(ns) == 0 || len(name) == 0 {
736 return fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
737 }
738 sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
739 if err != nil {
740 if apierrors.IsNotFound(err) {
741 logger.V(4).Info("Job has been deleted", "key", key)
742 jm.expectations.DeleteExpectations(logger, key)
743 jm.finalizerExpectations.deleteExpectations(logger, key)
744
745 err := jm.podBackoffStore.removeBackoffRecord(key)
746 if err != nil {
747
748 return fmt.Errorf("error removing backoff record %w", err)
749 }
750 return nil
751 }
752 return err
753 }
754
755
756
757
758
759
760 if controllerName := managedByExternalController(sharedJob); controllerName != nil {
761 logger.V(2).Info("Skip syncing the job as it is managed by an external controller", "key", key, "uid", sharedJob.UID, "controllerName", controllerName)
762 return nil
763 }
764
765
766 job := *sharedJob.DeepCopy()
767
768
769 if IsJobFinished(&job) {
770 err := jm.podBackoffStore.removeBackoffRecord(key)
771 if err != nil {
772
773 return fmt.Errorf("error removing backoff record %w", err)
774 }
775 return nil
776 }
777
778 if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode != batch.NonIndexedCompletion && *job.Spec.CompletionMode != batch.IndexedCompletion {
779 jm.recorder.Event(&job, v1.EventTypeWarning, "UnknownCompletionMode", "Skipped Job sync because completion mode is unknown")
780 return nil
781 }
782
783 completionMode := getCompletionMode(&job)
784 action := metrics.JobSyncActionReconciling
785
786 defer func() {
787 result := "success"
788 if rErr != nil {
789 result = "error"
790 }
791
792 metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(jm.clock.Since(startTime).Seconds())
793 metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
794 }()
795
796 if job.Status.UncountedTerminatedPods == nil {
797 job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
798 }
799
800
801
802
803 satisfiedExpectations := jm.expectations.SatisfiedExpectations(logger, key)
804
805 pods, err := jm.getPodsForJob(ctx, &job)
806 if err != nil {
807 return err
808 }
809 var terminating *int32
810 if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
811 terminating = ptr.To(controller.CountTerminatingPods(pods))
812 }
813 jobCtx := &syncJobCtx{
814 job: &job,
815 pods: pods,
816 activePods: controller.FilterActivePods(logger, pods),
817 terminating: terminating,
818 uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
819 expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
820 }
821 active := int32(len(jobCtx.activePods))
822 newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
823 jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
824 jobCtx.failed = job.Status.Failed + int32(nonIgnoredFailedPodsCount(jobCtx, newFailedPods)) + int32(len(jobCtx.uncounted.failed))
825 var ready *int32
826 if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
827 ready = ptr.To(countReadyPods(jobCtx.activePods))
828 }
829
830
831 if job.Status.StartTime == nil && !jobSuspended(&job) {
832 now := metav1.NewTime(jm.clock.Now())
833 job.Status.StartTime = &now
834 }
835
836 jobCtx.newBackoffRecord = jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
837
838 var manageJobErr error
839
840 exceedsBackoffLimit := jobCtx.failed > *job.Spec.BackoffLimit
841 jobCtx.finishedCondition = hasSuccessCriteriaMetCondition(&job)
842
843
844
845 if jobCtx.finishedCondition == nil && feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
846 if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
847 jobCtx.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
848 } else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil {
849
850 jobCtx.finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, batch.JobReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
851 }
852 }
853 if jobCtx.finishedCondition == nil {
854 if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
855
856
857 jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonBackoffLimitExceeded, "Job has reached the specified backoff limit", jm.clock.Now())
858 } else if jm.pastActiveDeadline(&job) {
859 jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline", jm.clock.Now())
860 } else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
861 syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
862 logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
863 jm.queue.AddAfter(key, syncDuration)
864 }
865 }
866
867 if isIndexedJob(&job) {
868 jobCtx.prevSucceededIndexes, jobCtx.succeededIndexes = calculateSucceededIndexes(logger, &job, pods)
869 jobCtx.succeeded = int32(jobCtx.succeededIndexes.total())
870 if hasBackoffLimitPerIndex(&job) {
871 jobCtx.failedIndexes = calculateFailedIndexes(logger, &job, pods)
872 if jobCtx.finishedCondition == nil {
873 if job.Spec.MaxFailedIndexes != nil && jobCtx.failedIndexes.total() > int(*job.Spec.MaxFailedIndexes) {
874 jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonMaxFailedIndexesExceeded, "Job has exceeded the specified maximal number of failed indexes", jm.clock.Now())
875 } else if jobCtx.failedIndexes.total() > 0 && jobCtx.failedIndexes.total()+jobCtx.succeededIndexes.total() >= int(*job.Spec.Completions) {
876 jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonFailedIndexes, "Job has failed indexes", jm.clock.Now())
877 }
878 }
879 jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx)
880 }
881 if jobCtx.finishedCondition == nil && hasSuccessCriteriaMetCondition(jobCtx.job) == nil {
882 if msg, met := matchSuccessPolicy(logger, job.Spec.SuccessPolicy, *job.Spec.Completions, jobCtx.succeededIndexes); met {
883 jobCtx.finishedCondition = newCondition(batch.JobSuccessCriteriaMet, v1.ConditionTrue, batch.JobReasonSuccessPolicy, msg, jm.clock.Now())
884 }
885 }
886 }
887 suspendCondChanged := false
888
889 if jobCtx.finishedCondition != nil {
890 deleted, err := jm.deleteActivePods(ctx, &job, jobCtx.activePods)
891 if deleted != active || !satisfiedExpectations {
892
893
894 jobCtx.finishedCondition = nil
895 }
896 active -= deleted
897 manageJobErr = err
898 } else {
899 manageJobCalled := false
900 if satisfiedExpectations && job.DeletionTimestamp == nil {
901 active, action, manageJobErr = jm.manageJob(ctx, &job, jobCtx)
902 manageJobCalled = true
903 }
904 complete := false
905 if job.Spec.Completions == nil {
906
907
908
909
910
911
912 complete = jobCtx.succeeded > 0 && active == 0
913 } else {
914
915
916
917
918 complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0
919 }
920 if complete {
921 jobCtx.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
922 } else if manageJobCalled {
923
924
925
926 if job.Spec.Suspend != nil && *job.Spec.Suspend {
927
928 var isUpdated bool
929 job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", jm.clock.Now())
930 if isUpdated {
931 suspendCondChanged = true
932 jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
933 }
934 } else {
935
936 var isUpdated bool
937 job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed", jm.clock.Now())
938 if isUpdated {
939 suspendCondChanged = true
940 jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
941
942
943
944
945
946
947 now := metav1.NewTime(jm.clock.Now())
948 job.Status.StartTime = &now
949 }
950 }
951 }
952 }
953
954 needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready)
955 needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating)
956 job.Status.Active = active
957 job.Status.Ready = ready
958 job.Status.Terminating = jobCtx.terminating
959 err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
960 if err != nil {
961 return fmt.Errorf("tracking status: %w", err)
962 }
963
964 return manageJobErr
965 }
966
967
968
969
970
971
972 func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, error) {
973 errCh := make(chan error, len(pods))
974 successfulDeletes := int32(len(pods))
975 wg := sync.WaitGroup{}
976 wg.Add(len(pods))
977 for i := range pods {
978 go func(pod *v1.Pod) {
979 defer wg.Done()
980 if err := jm.podControl.DeletePod(ctx, job.Namespace, pod.Name, job); err != nil && !apierrors.IsNotFound(err) {
981 atomic.AddInt32(&successfulDeletes, -1)
982 errCh <- err
983 utilruntime.HandleError(err)
984 }
985 }(pods[i])
986 }
987 wg.Wait()
988 return successfulDeletes, errorFromChannel(errCh)
989 }
990
991 func nonIgnoredFailedPodsCount(jobCtx *syncJobCtx, failedPods []*v1.Pod) int {
992 result := len(failedPods)
993 if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil {
994 for _, p := range failedPods {
995 _, countFailed, _ := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, p)
996 if !countFailed {
997 result--
998 }
999 }
1000 }
1001 return result
1002 }
1003
1004
1005
1006 func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) {
1007 errCh := make(chan error, len(pods))
1008 successfulDeletes := int32(len(pods))
1009 logger := klog.FromContext(ctx)
1010
1011 failDelete := func(pod *v1.Pod, err error) {
1012
1013 jm.expectations.DeletionObserved(logger, jobKey)
1014 if !apierrors.IsNotFound(err) {
1015 logger.V(2).Info("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err)
1016 atomic.AddInt32(&successfulDeletes, -1)
1017 errCh <- err
1018 utilruntime.HandleError(err)
1019 }
1020 }
1021
1022 wg := sync.WaitGroup{}
1023 wg.Add(len(pods))
1024 for i := range pods {
1025 go func(pod *v1.Pod) {
1026 defer wg.Done()
1027 if patch := removeTrackingFinalizerPatch(pod); patch != nil {
1028 if err := jm.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patch); err != nil {
1029 failDelete(pod, fmt.Errorf("removing completion finalizer: %w", err))
1030 return
1031 }
1032 }
1033 if err := jm.podControl.DeletePod(ctx, job.Namespace, pod.Name, job); err != nil {
1034 failDelete(pod, err)
1035 }
1036 }(pods[i])
1037 }
1038 wg.Wait()
1039 return successfulDeletes, errorFromChannel(errCh)
1040 }
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051 func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, needsFlush bool) error {
1052 logger := klog.FromContext(ctx)
1053
1054 isIndexed := isIndexedJob(jobCtx.job)
1055 var podsToRemoveFinalizer []*v1.Pod
1056 uncountedStatus := jobCtx.job.Status.UncountedTerminatedPods
1057 var newSucceededIndexes []int
1058 if isIndexed {
1059
1060 sort.Sort(byCompletionIndex(jobCtx.pods))
1061 }
1062 uidsWithFinalizer := make(sets.Set[string], len(jobCtx.pods))
1063 for _, p := range jobCtx.pods {
1064 uid := string(p.UID)
1065 if hasJobTrackingFinalizer(p) && !jobCtx.expectedRmFinalizers.Has(uid) {
1066 uidsWithFinalizer.Insert(uid)
1067 }
1068 }
1069
1070
1071 oldCounters := jobCtx.job.Status
1072 if cleanUncountedPodsWithoutFinalizers(&jobCtx.job.Status, uidsWithFinalizer) {
1073 needsFlush = true
1074 }
1075 podFailureCountByPolicyAction := map[string]int{}
1076 reachedMaxUncountedPods := false
1077 for _, pod := range jobCtx.pods {
1078 if !hasJobTrackingFinalizer(pod) || jobCtx.expectedRmFinalizers.Has(string(pod.UID)) {
1079
1080 continue
1081 }
1082 considerPodFailed := isPodFailed(pod, jobCtx.job)
1083 if !canRemoveFinalizer(logger, jobCtx, pod, considerPodFailed) {
1084 continue
1085 }
1086 podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
1087 if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(string(pod.UID)) {
1088 if isIndexed {
1089
1090
1091 ix := getCompletionIndex(pod.Annotations)
1092 if ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions) && !jobCtx.prevSucceededIndexes.has(ix) {
1093 newSucceededIndexes = append(newSucceededIndexes, ix)
1094 needsFlush = true
1095 }
1096 } else if !jobCtx.uncounted.succeeded.Has(string(pod.UID)) {
1097 needsFlush = true
1098 uncountedStatus.Succeeded = append(uncountedStatus.Succeeded, pod.UID)
1099 }
1100 } else if considerPodFailed || (jobCtx.finishedCondition != nil && !isSuccessCriteriaMetCondition(jobCtx.finishedCondition)) {
1101
1102 ix := getCompletionIndex(pod.Annotations)
1103 if !jobCtx.uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) {
1104 if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil {
1105 _, countFailed, action := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, pod)
1106 if action != nil {
1107 podFailureCountByPolicyAction[string(*action)] += 1
1108 }
1109 if countFailed {
1110 needsFlush = true
1111 uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID)
1112 }
1113 } else {
1114 needsFlush = true
1115 uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID)
1116 }
1117 }
1118 }
1119 if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= MaxUncountedPods {
1120
1121
1122
1123
1124
1125
1126
1127
1128 reachedMaxUncountedPods = true
1129 break
1130 }
1131 }
1132 if isIndexed {
1133 jobCtx.succeededIndexes = jobCtx.succeededIndexes.withOrderedIndexes(newSucceededIndexes)
1134 succeededIndexesStr := jobCtx.succeededIndexes.String()
1135 if succeededIndexesStr != jobCtx.job.Status.CompletedIndexes {
1136 needsFlush = true
1137 }
1138 jobCtx.job.Status.Succeeded = int32(jobCtx.succeededIndexes.total())
1139 jobCtx.job.Status.CompletedIndexes = succeededIndexesStr
1140 var failedIndexesStr *string
1141 if jobCtx.failedIndexes != nil {
1142 failedIndexesStr = ptr.To(jobCtx.failedIndexes.String())
1143 }
1144 if !ptr.Equal(jobCtx.job.Status.FailedIndexes, failedIndexesStr) {
1145 jobCtx.job.Status.FailedIndexes = failedIndexesStr
1146 needsFlush = true
1147 }
1148 }
1149 if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
1150 if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget {
1151
1152
1153 jobCtx.job.Status.Conditions = append(jobCtx.job.Status.Conditions, *jobCtx.finishedCondition)
1154 needsFlush = true
1155
1156
1157
1158 jobCtx.finishedCondition = newFailedConditionForFailureTarget(jobCtx.finishedCondition, jm.clock.Now())
1159 }
1160 }
1161 if isSuccessCriteriaMetCondition(jobCtx.finishedCondition) {
1162
1163 if hasSuccessCriteriaMetCondition(jobCtx.job) == nil {
1164 jobCtx.job.Status.Conditions = append(jobCtx.job.Status.Conditions, *jobCtx.finishedCondition)
1165 needsFlush = true
1166 }
1167
1168
1169
1170 jobCtx.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, jobCtx.finishedCondition.Reason, jobCtx.finishedCondition.Message, jm.clock.Now())
1171 }
1172 var err error
1173 if jobCtx.job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, jobCtx, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush); err != nil {
1174 return err
1175 }
1176 jobFinished := !reachedMaxUncountedPods && jm.enactJobFinished(jobCtx.job, jobCtx.finishedCondition)
1177 if jobFinished {
1178 needsFlush = true
1179 }
1180 if needsFlush {
1181 if _, err := jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
1182 return fmt.Errorf("removing uncounted pods from status: %w", err)
1183 }
1184 if jobFinished {
1185 jm.recordJobFinished(jobCtx.job, jobCtx.finishedCondition)
1186 }
1187 recordJobPodFinished(logger, jobCtx.job, oldCounters)
1188 }
1189 return nil
1190 }
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201 func canRemoveFinalizer(logger klog.Logger, jobCtx *syncJobCtx, pod *v1.Pod, considerPodFailed bool) bool {
1202 if jobCtx.job.DeletionTimestamp != nil || jobCtx.finishedCondition != nil || pod.Status.Phase == v1.PodSucceeded {
1203 return true
1204 }
1205 if !considerPodFailed {
1206 return false
1207 }
1208 if hasBackoffLimitPerIndex(jobCtx.job) {
1209 if index := getCompletionIndex(pod.Annotations); index != unknownCompletionIndex {
1210 if p, ok := jobCtx.podsWithDelayedDeletionPerIndex[index]; ok && p.UID == pod.UID {
1211 logger.V(3).Info("Delaying pod finalizer removal to await for pod recreation within the index", "pod", klog.KObj(pod))
1212 return false
1213 }
1214 }
1215 }
1216 return true
1217 }
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230 func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {
1231 logger := klog.FromContext(ctx)
1232 var err error
1233 if needsFlush {
1234 if jobCtx.job, err = jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
1235 return jobCtx.job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
1236 }
1237
1238 err = jm.podBackoffStore.updateBackoffRecord(jobCtx.newBackoffRecord)
1239
1240 if err != nil {
1241
1242
1243
1244 logger.Error(err, "Backoff update failed")
1245 }
1246
1247 recordJobPodFinished(logger, jobCtx.job, *oldCounters)
1248
1249 *oldCounters = jobCtx.job.Status
1250 needsFlush = false
1251 }
1252 recordJobPodFailurePolicyActions(jobCtx.job, podFailureCountByPolicyAction)
1253
1254 jobKey, err := controller.KeyFunc(jobCtx.job)
1255 if err != nil {
1256 return jobCtx.job, needsFlush, fmt.Errorf("getting job key: %w", err)
1257 }
1258 var rmErr error
1259 if len(podsToRemoveFinalizer) > 0 {
1260 var rmSucceded []bool
1261 rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(ctx, jobKey, podsToRemoveFinalizer)
1262 for i, p := range podsToRemoveFinalizer {
1263 if rmSucceded[i] {
1264 uidsWithFinalizer.Delete(string(p.UID))
1265 }
1266 }
1267 }
1268
1269
1270 if cleanUncountedPodsWithoutFinalizers(&jobCtx.job.Status, uidsWithFinalizer) {
1271 needsFlush = true
1272 }
1273 if rmErr != nil && needsFlush {
1274 if job, err := jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
1275 return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err)
1276 }
1277 }
1278 return jobCtx.job, needsFlush, rmErr
1279 }
1280
1281
1282
1283
1284
1285 func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[string]) bool {
1286 updated := false
1287 uncountedStatus := status.UncountedTerminatedPods
1288 newUncounted := filterInUncountedUIDs(uncountedStatus.Succeeded, uidsWithFinalizer)
1289 if len(newUncounted) != len(uncountedStatus.Succeeded) {
1290 updated = true
1291 status.Succeeded += int32(len(uncountedStatus.Succeeded) - len(newUncounted))
1292 uncountedStatus.Succeeded = newUncounted
1293 }
1294 newUncounted = filterInUncountedUIDs(uncountedStatus.Failed, uidsWithFinalizer)
1295 if len(newUncounted) != len(uncountedStatus.Failed) {
1296 updated = true
1297 status.Failed += int32(len(uncountedStatus.Failed) - len(newUncounted))
1298 uncountedStatus.Failed = newUncounted
1299 }
1300 return updated
1301 }
1302
1303
1304
1305
1306
1307 func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKey string, pods []*v1.Pod) ([]bool, error) {
1308 logger := klog.FromContext(ctx)
1309 errCh := make(chan error, len(pods))
1310 succeeded := make([]bool, len(pods))
1311 uids := make([]string, len(pods))
1312 for i, p := range pods {
1313 uids[i] = string(p.UID)
1314 }
1315 if jobKey != "" {
1316 err := jm.finalizerExpectations.expectFinalizersRemoved(logger, jobKey, uids)
1317 if err != nil {
1318 return succeeded, fmt.Errorf("setting expected removed finalizers: %w", err)
1319 }
1320 }
1321 wg := sync.WaitGroup{}
1322 wg.Add(len(pods))
1323 for i := range pods {
1324 go func(i int) {
1325 pod := pods[i]
1326 defer wg.Done()
1327 if patch := removeTrackingFinalizerPatch(pod); patch != nil {
1328 if err := jm.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patch); err != nil {
1329
1330
1331 if jobKey != "" {
1332 jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
1333 }
1334 if !apierrors.IsNotFound(err) {
1335 errCh <- err
1336 utilruntime.HandleError(fmt.Errorf("removing tracking finalizer: %w", err))
1337 return
1338 }
1339 }
1340 succeeded[i] = true
1341 }
1342 }(i)
1343 }
1344 wg.Wait()
1345
1346 return succeeded, errorFromChannel(errCh)
1347 }
1348
1349
1350
1351 func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool {
1352 if finishedCond == nil {
1353 return false
1354 }
1355 if uncounted := job.Status.UncountedTerminatedPods; uncounted != nil {
1356 if len(uncounted.Succeeded) > 0 || len(uncounted.Failed) > 0 {
1357 return false
1358 }
1359 }
1360 job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message, jm.clock.Now())
1361 if finishedCond.Type == batch.JobComplete {
1362 job.Status.CompletionTime = &finishedCond.LastTransitionTime
1363 }
1364 return true
1365 }
1366
1367
1368 func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool {
1369 completionMode := getCompletionMode(job)
1370 if finishedCond.Type == batch.JobComplete {
1371 if job.Spec.Completions != nil && job.Status.Succeeded > *job.Spec.Completions {
1372 jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
1373 }
1374 jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed")
1375 metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded", "").Inc()
1376 } else {
1377 jm.recorder.Event(job, v1.EventTypeWarning, finishedCond.Reason, finishedCond.Message)
1378 metrics.JobFinishedNum.WithLabelValues(completionMode, "failed", finishedCond.Reason).Inc()
1379 }
1380 return true
1381 }
1382
1383 func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[string]) []types.UID {
1384 var newUncounted []types.UID
1385 for _, uid := range uncounted {
1386 if include.Has(string(uid)) {
1387 newUncounted = append(newUncounted, uid)
1388 }
1389 }
1390 return newUncounted
1391 }
1392
1393
1394
1395 func newFailedConditionForFailureTarget(condition *batch.JobCondition, now time.Time) *batch.JobCondition {
1396 return newCondition(batch.JobFailed, v1.ConditionTrue, condition.Reason, condition.Message, now)
1397 }
1398
1399
1400
1401 func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool {
1402 if job.Spec.Template.Spec.RestartPolicy != v1.RestartPolicyOnFailure {
1403 return false
1404 }
1405 result := int32(0)
1406 for i := range pods {
1407 po := pods[i]
1408 if po.Status.Phase == v1.PodRunning || po.Status.Phase == v1.PodPending {
1409 for j := range po.Status.InitContainerStatuses {
1410 stat := po.Status.InitContainerStatuses[j]
1411 result += stat.RestartCount
1412 }
1413 for j := range po.Status.ContainerStatuses {
1414 stat := po.Status.ContainerStatuses[j]
1415 result += stat.RestartCount
1416 }
1417 }
1418 }
1419 if *job.Spec.BackoffLimit == 0 {
1420 return result > 0
1421 }
1422 return result >= *job.Spec.BackoffLimit
1423 }
1424
1425
1426
1427
1428 func (jm *Controller) pastActiveDeadline(job *batch.Job) bool {
1429 if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil || jobSuspended(job) {
1430 return false
1431 }
1432 duration := jm.clock.Since(job.Status.StartTime.Time)
1433 allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
1434 return duration >= allowedDuration
1435 }
1436
1437 func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatus, reason, message string, now time.Time) *batch.JobCondition {
1438 return &batch.JobCondition{
1439 Type: conditionType,
1440 Status: status,
1441 LastProbeTime: metav1.NewTime(now),
1442 LastTransitionTime: metav1.NewTime(now),
1443 Reason: reason,
1444 Message: message,
1445 }
1446 }
1447
1448
1449 func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string {
1450 if !feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) || job.Spec.PodFailurePolicy == nil {
1451 return nil
1452 }
1453 for _, p := range pods {
1454 if isPodFailed(p, job) {
1455 jobFailureMessage, _, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
1456 if jobFailureMessage != nil {
1457 return jobFailureMessage
1458 }
1459 }
1460 }
1461 return nil
1462 }
1463
1464
1465
1466 func getNewFinishedPods(jobCtx *syncJobCtx) (succeededPods, failedPods []*v1.Pod) {
1467 succeededPods = getValidPodsWithFilter(jobCtx, jobCtx.uncounted.Succeeded(), func(p *v1.Pod) bool {
1468 return p.Status.Phase == v1.PodSucceeded
1469 })
1470 failedPods = getValidPodsWithFilter(jobCtx, jobCtx.uncounted.Failed(), func(p *v1.Pod) bool {
1471 return isPodFailed(p, jobCtx.job)
1472 })
1473 return succeededPods, failedPods
1474 }
1475
1476
1477
1478 func jobSuspended(job *batch.Job) bool {
1479 return job.Spec.Suspend != nil && *job.Spec.Suspend
1480 }
1481
1482
1483
1484
1485
1486 func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) {
1487 logger := klog.FromContext(ctx)
1488 active := int32(len(jobCtx.activePods))
1489 parallelism := *job.Spec.Parallelism
1490 jobKey, err := controller.KeyFunc(job)
1491 if err != nil {
1492 utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
1493 return 0, metrics.JobSyncActionTracking, nil
1494 }
1495
1496 if jobSuspended(job) {
1497 logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
1498 podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active))
1499 jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
1500 removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
1501 active -= removed
1502 return active, metrics.JobSyncActionPodsDeleted, err
1503 }
1504
1505 var terminating int32 = 0
1506 if onlyReplaceFailedPods(jobCtx.job) {
1507
1508
1509
1510 if jobCtx.terminating == nil {
1511 terminating = controller.CountTerminatingPods(jobCtx.pods)
1512 } else {
1513 terminating = *jobCtx.terminating
1514 }
1515 }
1516 wantActive := int32(0)
1517 if job.Spec.Completions == nil {
1518
1519
1520
1521 if jobCtx.succeeded > 0 {
1522 wantActive = active
1523 } else {
1524 wantActive = parallelism
1525 }
1526 } else {
1527
1528
1529 wantActive = *job.Spec.Completions - jobCtx.succeeded
1530 if wantActive > parallelism {
1531 wantActive = parallelism
1532 }
1533 if wantActive < 0 {
1534 wantActive = 0
1535 }
1536 }
1537
1538 rmAtLeast := active - wantActive
1539 if rmAtLeast < 0 {
1540 rmAtLeast = 0
1541 }
1542 podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(rmAtLeast))
1543 if len(podsToDelete) > MaxPodCreateDeletePerSync {
1544 podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
1545 }
1546 if len(podsToDelete) > 0 {
1547 jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
1548 logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
1549 removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
1550 active -= removed
1551
1552
1553
1554
1555 return active, metrics.JobSyncActionPodsDeleted, err
1556 }
1557
1558 if diff := wantActive - terminating - active; diff > 0 {
1559 var remainingTime time.Duration
1560 if !hasBackoffLimitPerIndex(job) {
1561
1562 remainingTime = jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
1563 }
1564 if remainingTime > 0 {
1565 jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
1566 return 0, metrics.JobSyncActionPodsCreated, nil
1567 }
1568 if diff > int32(MaxPodCreateDeletePerSync) {
1569 diff = int32(MaxPodCreateDeletePerSync)
1570 }
1571
1572 var indexesToAdd []int
1573 if isIndexedJob(job) {
1574 indexesToAdd = firstPendingIndexes(jobCtx, int(diff), int(*job.Spec.Completions))
1575 if hasBackoffLimitPerIndex(job) {
1576 indexesToAdd, remainingTime = jm.getPodCreationInfoForIndependentIndexes(logger, indexesToAdd, jobCtx.podsWithDelayedDeletionPerIndex)
1577 if remainingTime > 0 {
1578 jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
1579 return 0, metrics.JobSyncActionPodsCreated, nil
1580 }
1581 }
1582 diff = int32(len(indexesToAdd))
1583 }
1584
1585 jm.expectations.ExpectCreations(logger, jobKey, int(diff))
1586 errCh := make(chan error, diff)
1587 logger.V(4).Info("Too few pods running", "key", jobKey, "need", wantActive, "creating", diff)
1588
1589 wait := sync.WaitGroup{}
1590
1591 active += diff
1592
1593 podTemplate := job.Spec.Template.DeepCopy()
1594 if isIndexedJob(job) {
1595 addCompletionIndexEnvVariables(podTemplate)
1596 }
1597 podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
1598
1599
1600 var creationsSucceeded, creationsFailed int32 = 0, 0
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610 for batchSize := min(diff, int32(controller.SlowStartInitialBatchSize)); diff > 0; batchSize = min(2*batchSize, diff) {
1611 errorCount := len(errCh)
1612 wait.Add(int(batchSize))
1613 for i := int32(0); i < batchSize; i++ {
1614 completionIndex := unknownCompletionIndex
1615 if len(indexesToAdd) > 0 {
1616 completionIndex = indexesToAdd[0]
1617 indexesToAdd = indexesToAdd[1:]
1618 }
1619 go func() {
1620 template := podTemplate
1621 generateName := ""
1622 if completionIndex != unknownCompletionIndex {
1623 template = podTemplate.DeepCopy()
1624 addCompletionIndexAnnotation(template, completionIndex)
1625
1626 if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
1627 addCompletionIndexLabel(template, completionIndex)
1628 }
1629 template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)
1630 generateName = podGenerateNameWithIndex(job.Name, completionIndex)
1631 if hasBackoffLimitPerIndex(job) {
1632 addIndexFailureCountAnnotation(logger, template, job, jobCtx.podsWithDelayedDeletionPerIndex[completionIndex])
1633 }
1634 }
1635 defer wait.Done()
1636 err := jm.podControl.CreatePodsWithGenerateName(ctx, job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)
1637 if err != nil {
1638 if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
1639
1640
1641 return
1642 }
1643 }
1644 if err != nil {
1645 defer utilruntime.HandleError(err)
1646
1647 logger.V(2).Info("Failed creation, decrementing expectations", "job", klog.KObj(job))
1648 jm.expectations.CreationObserved(logger, jobKey)
1649 atomic.AddInt32(&active, -1)
1650 errCh <- err
1651 atomic.AddInt32(&creationsFailed, 1)
1652 }
1653 atomic.AddInt32(&creationsSucceeded, 1)
1654 }()
1655 }
1656 wait.Wait()
1657
1658 skippedPods := diff - batchSize
1659 if errorCount < len(errCh) && skippedPods > 0 {
1660 logger.V(2).Info("Slow-start failure. Skipping creating pods, decrementing expectations", "skippedCount", skippedPods, "job", klog.KObj(job))
1661 active -= skippedPods
1662 for i := int32(0); i < skippedPods; i++ {
1663
1664 jm.expectations.CreationObserved(logger, jobKey)
1665 }
1666
1667
1668 break
1669 }
1670 diff -= batchSize
1671 }
1672 recordJobPodsCreationTotal(job, jobCtx, creationsSucceeded, creationsFailed)
1673 return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
1674 }
1675
1676 return active, metrics.JobSyncActionTracking, nil
1677 }
1678
1679
1680
1681
1682
1683 func (jm *Controller) getPodCreationInfoForIndependentIndexes(logger klog.Logger, indexesToAdd []int, podsWithDelayedDeletionPerIndex map[int]*v1.Pod) ([]int, time.Duration) {
1684 var indexesToAddNow []int
1685 var minRemainingTimePerIndex *time.Duration
1686 for _, indexToAdd := range indexesToAdd {
1687 if remainingTimePerIndex := getRemainingTimePerIndex(logger, jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff, podsWithDelayedDeletionPerIndex[indexToAdd]); remainingTimePerIndex == 0 {
1688 indexesToAddNow = append(indexesToAddNow, indexToAdd)
1689 } else if minRemainingTimePerIndex == nil || remainingTimePerIndex < *minRemainingTimePerIndex {
1690 minRemainingTimePerIndex = &remainingTimePerIndex
1691 }
1692 }
1693 if len(indexesToAddNow) > 0 {
1694 return indexesToAddNow, 0
1695 }
1696 return indexesToAddNow, ptr.Deref(minRemainingTimePerIndex, 0)
1697 }
1698
1699
1700
1701
1702
1703
1704
1705 func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.Pod {
1706 var rm, left []*v1.Pod
1707
1708 if isIndexedJob(job) {
1709 rm = make([]*v1.Pod, 0, rmAtLeast)
1710 left = make([]*v1.Pod, 0, len(pods)-rmAtLeast)
1711 rm, left = appendDuplicatedIndexPodsForRemoval(rm, left, pods, int(*job.Spec.Completions))
1712 } else {
1713 left = pods
1714 }
1715
1716 if len(rm) < rmAtLeast {
1717 sort.Sort(controller.ActivePods(left))
1718 rm = append(rm, left[:rmAtLeast-len(rm)]...)
1719 }
1720 return rm
1721 }
1722
1723
1724 func (jm *Controller) updateJobStatus(ctx context.Context, job *batch.Job) (*batch.Job, error) {
1725 return jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(ctx, job, metav1.UpdateOptions{})
1726 }
1727
1728 func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) error {
1729 _, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).Patch(
1730 ctx, job.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
1731 return err
1732 }
1733
1734
1735
1736
1737 func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
1738 var result []*v1.Pod
1739 for _, p := range jobCtx.pods {
1740 uid := string(p.UID)
1741
1742
1743
1744 if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || jobCtx.expectedRmFinalizers.Has(uid) {
1745 continue
1746 }
1747 if isIndexedJob(jobCtx.job) {
1748 idx := getCompletionIndex(p.Annotations)
1749 if idx == unknownCompletionIndex || idx >= int(*jobCtx.job.Spec.Completions) {
1750 continue
1751 }
1752 }
1753 if filter(p) {
1754 result = append(result, p)
1755 }
1756 }
1757 return result
1758 }
1759
1760
1761 func getCompletionMode(job *batch.Job) string {
1762 if isIndexedJob(job) {
1763 return string(batch.IndexedCompletion)
1764 }
1765 return string(batch.NonIndexedCompletion)
1766 }
1767
1768 func appendJobCompletionFinalizerIfNotFound(finalizers []string) []string {
1769 for _, fin := range finalizers {
1770 if fin == batch.JobTrackingFinalizer {
1771 return finalizers
1772 }
1773 }
1774 return append(finalizers, batch.JobTrackingFinalizer)
1775 }
1776
1777 func removeTrackingFinalizerPatch(pod *v1.Pod) []byte {
1778 if !hasJobTrackingFinalizer(pod) {
1779 return nil
1780 }
1781 patch := map[string]interface{}{
1782 "metadata": map[string]interface{}{
1783 "$deleteFromPrimitiveList/finalizers": []string{batch.JobTrackingFinalizer},
1784 },
1785 }
1786 patchBytes, _ := json.Marshal(patch)
1787 return patchBytes
1788 }
1789
1790 type uncountedTerminatedPods struct {
1791 succeeded sets.Set[string]
1792 failed sets.Set[string]
1793 }
1794
1795 func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerminatedPods {
1796 obj := uncountedTerminatedPods{
1797 succeeded: make(sets.Set[string], len(in.Succeeded)),
1798 failed: make(sets.Set[string], len(in.Failed)),
1799 }
1800 for _, v := range in.Succeeded {
1801 obj.succeeded.Insert(string(v))
1802 }
1803 for _, v := range in.Failed {
1804 obj.failed.Insert(string(v))
1805 }
1806 return &obj
1807 }
1808
1809 func (u *uncountedTerminatedPods) Succeeded() sets.Set[string] {
1810 if u == nil {
1811 return nil
1812 }
1813 return u.succeeded
1814 }
1815
1816 func (u *uncountedTerminatedPods) Failed() sets.Set[string] {
1817 if u == nil {
1818 return nil
1819 }
1820 return u.failed
1821 }
1822
1823 func errorFromChannel(errCh <-chan error) error {
1824 select {
1825 case err := <-errCh:
1826 return err
1827 default:
1828 }
1829 return nil
1830 }
1831
1832
1833
1834
1835
1836
1837
1838 func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditionType, status v1.ConditionStatus, reason, message string, now time.Time) ([]batch.JobCondition, bool) {
1839 if condition := findConditionByType(list, cType); condition != nil {
1840 if condition.Status != status || condition.Reason != reason || condition.Message != message {
1841 *condition = *newCondition(cType, status, reason, message, now)
1842 return list, true
1843 }
1844 return list, false
1845 }
1846
1847 if status != v1.ConditionFalse {
1848 return append(list, *newCondition(cType, status, reason, message, now)), true
1849 }
1850 return list, false
1851 }
1852
1853 func isPodFailed(p *v1.Pod, job *batch.Job) bool {
1854 if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) && feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
1855
1856
1857
1858
1859
1860 return p.Status.Phase == v1.PodFailed
1861 }
1862 if p.Status.Phase == v1.PodFailed {
1863 return true
1864 }
1865 if onlyReplaceFailedPods(job) {
1866 return p.Status.Phase == v1.PodFailed
1867 }
1868
1869
1870 return p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded
1871 }
1872
1873 func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType) *batch.JobCondition {
1874 for i := range list {
1875 if list[i].Type == cType {
1876 return &list[i]
1877 }
1878 }
1879 return nil
1880 }
1881
1882 func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch.JobStatus) {
1883 completionMode := completionModeStr(job)
1884 var diff int
1885
1886
1887
1888
1889
1890
1891 if isIndexedJob(job) {
1892 completions := int(*job.Spec.Completions)
1893 if job.Status.CompletedIndexes != oldCounters.CompletedIndexes {
1894 diff = indexesCount(logger, &job.Status.CompletedIndexes, completions) - indexesCount(logger, &oldCounters.CompletedIndexes, completions)
1895 }
1896 backoffLimitLabel := backoffLimitMetricsLabel(job)
1897 metrics.JobFinishedIndexesTotal.WithLabelValues(metrics.Succeeded, backoffLimitLabel).Add(float64(diff))
1898 if hasBackoffLimitPerIndex(job) && job.Status.FailedIndexes != oldCounters.FailedIndexes {
1899 if failedDiff := indexesCount(logger, job.Status.FailedIndexes, completions) - indexesCount(logger, oldCounters.FailedIndexes, completions); failedDiff > 0 {
1900 metrics.JobFinishedIndexesTotal.WithLabelValues(metrics.Failed, backoffLimitLabel).Add(float64(failedDiff))
1901 }
1902 }
1903 } else {
1904 diff = int(job.Status.Succeeded) - int(oldCounters.Succeeded)
1905 }
1906 metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded).Add(float64(diff))
1907
1908
1909 diff = int(job.Status.Failed - oldCounters.Failed)
1910 metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff))
1911 }
1912
1913 func indexesCount(logger klog.Logger, indexesStr *string, completions int) int {
1914 if indexesStr == nil {
1915 return 0
1916 }
1917 return parseIndexesFromString(logger, *indexesStr, completions).total()
1918 }
1919
1920 func backoffLimitMetricsLabel(job *batch.Job) string {
1921 if hasBackoffLimitPerIndex(job) {
1922 return "perIndex"
1923 }
1924 return "global"
1925 }
1926
1927 func recordJobPodFailurePolicyActions(job *batch.Job, podFailureCountByPolicyAction map[string]int) {
1928 for action, count := range podFailureCountByPolicyAction {
1929 metrics.PodFailuresHandledByFailurePolicy.WithLabelValues(action).Add(float64(count))
1930 }
1931 }
1932
1933 func countReadyPods(pods []*v1.Pod) int32 {
1934 cnt := int32(0)
1935 for _, p := range pods {
1936 if podutil.IsPodReady(p) {
1937 cnt++
1938 }
1939 }
1940 return cnt
1941 }
1942
1943
1944
1945
1946 func onlyReplaceFailedPods(job *batch.Job) bool {
1947
1948
1949
1950 if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) && job.Spec.PodReplacementPolicy != nil && *job.Spec.PodReplacementPolicy == batch.Failed {
1951 return true
1952 }
1953 return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
1954 }
1955
1956 func (jm *Controller) cleanupPodFinalizers(job *batch.Job) {
1957
1958 selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector)
1959 if err != nil {
1960 utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err))
1961 return
1962 }
1963 pods, _ := jm.podStore.Pods(job.Namespace).List(selector)
1964 for _, pod := range pods {
1965 if metav1.IsControlledBy(pod, job) && hasJobTrackingFinalizer(pod) {
1966 jm.enqueueOrphanPod(pod)
1967 }
1968 }
1969 }
1970
1971 func recordJobPodsCreationTotal(job *batch.Job, jobCtx *syncJobCtx, succeeded, failed int32) {
1972 reason := metrics.PodCreateNew
1973 if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
1974 if ptr.Deref(job.Spec.PodReplacementPolicy, batch.TerminatingOrFailed) == batch.Failed && jobCtx.failed > 0 {
1975 reason = metrics.PodRecreateFailed
1976 } else if jobCtx.failed > 0 || ptr.Deref(jobCtx.terminating, 0) > 0 {
1977 reason = metrics.PodRecreateTerminatingOrFailed
1978 }
1979 }
1980 if succeeded > 0 {
1981 metrics.JobPodsCreationTotal.WithLabelValues(reason, metrics.Succeeded).Add(float64(succeeded))
1982 }
1983 if failed > 0 {
1984 metrics.JobPodsCreationTotal.WithLabelValues(reason, metrics.Failed).Add(float64(failed))
1985 }
1986 }
1987
1988 func managedByExternalController(jobObj *batch.Job) *string {
1989 if feature.DefaultFeatureGate.Enabled(features.JobManagedBy) {
1990 if controllerName := jobObj.Spec.ManagedBy; controllerName != nil && *controllerName != batch.JobControllerName {
1991 return controllerName
1992 }
1993 }
1994 return nil
1995 }
1996
View as plain text