...

Source file src/k8s.io/kubernetes/pkg/controller/job/job_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/job

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package job
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"sort"
    24  	"sync"
    25  	"sync/atomic"
    26  	"time"
    27  
    28  	batch "k8s.io/api/batch/v1"
    29  	v1 "k8s.io/api/core/v1"
    30  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/labels"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	"k8s.io/apimachinery/pkg/util/json"
    35  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    36  	"k8s.io/apimachinery/pkg/util/sets"
    37  	"k8s.io/apimachinery/pkg/util/wait"
    38  	"k8s.io/apiserver/pkg/util/feature"
    39  	batchinformers "k8s.io/client-go/informers/batch/v1"
    40  	coreinformers "k8s.io/client-go/informers/core/v1"
    41  	clientset "k8s.io/client-go/kubernetes"
    42  	"k8s.io/client-go/kubernetes/scheme"
    43  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    44  	batchv1listers "k8s.io/client-go/listers/batch/v1"
    45  	corelisters "k8s.io/client-go/listers/core/v1"
    46  	"k8s.io/client-go/tools/cache"
    47  	"k8s.io/client-go/tools/record"
    48  	"k8s.io/client-go/util/workqueue"
    49  	"k8s.io/klog/v2"
    50  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    51  	"k8s.io/kubernetes/pkg/controller"
    52  	"k8s.io/kubernetes/pkg/controller/job/metrics"
    53  	"k8s.io/kubernetes/pkg/features"
    54  	"k8s.io/utils/clock"
    55  	"k8s.io/utils/ptr"
    56  )
    57  
    58  // controllerKind contains the schema.GroupVersionKind for this controller type.
    59  var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
    60  
    61  var (
    62  	// syncJobBatchPeriod is the batch period for controller sync invocations for a Job.
    63  	syncJobBatchPeriod = time.Second
    64  	// DefaultJobApiBackOff is the default API backoff period. Exported for tests.
    65  	DefaultJobApiBackOff = time.Second
    66  	// MaxJobApiBackOff is the max API backoff period. Exported for tests.
    67  	MaxJobApiBackOff = time.Minute
    68  	// DefaultJobPodFailureBackOff is the default pod failure backoff period. Exported for tests.
    69  	DefaultJobPodFailureBackOff = 10 * time.Second
    70  	// MaxJobPodFailureBackOff is the max  pod failure backoff period. Exported for tests.
    71  	MaxJobPodFailureBackOff = 10 * time.Minute
    72  	// MaxUncountedPods is the maximum size the slices in
    73  	// .status.uncountedTerminatedPods should have to keep their representation
    74  	// roughly below 20 KB. Exported for tests
    75  	MaxUncountedPods = 500
    76  	// MaxPodCreateDeletePerSync is the maximum number of pods that can be
    77  	// created or deleted in a single sync call. Exported for tests.
    78  	MaxPodCreateDeletePerSync = 500
    79  )
    80  
    81  // Controller ensures that all Job objects have corresponding pods to
    82  // run their configured workload.
    83  type Controller struct {
    84  	kubeClient clientset.Interface
    85  	podControl controller.PodControlInterface
    86  
    87  	// To allow injection of the following for testing.
    88  	updateStatusHandler func(ctx context.Context, job *batch.Job) (*batch.Job, error)
    89  	patchJobHandler     func(ctx context.Context, job *batch.Job, patch []byte) error
    90  	syncHandler         func(ctx context.Context, jobKey string) error
    91  	// podStoreSynced returns true if the pod store has been synced at least once.
    92  	// Added as a member to the struct to allow injection for testing.
    93  	podStoreSynced cache.InformerSynced
    94  	// jobStoreSynced returns true if the job store has been synced at least once.
    95  	// Added as a member to the struct to allow injection for testing.
    96  	jobStoreSynced cache.InformerSynced
    97  
    98  	// A TTLCache of pod creates/deletes each rc expects to see
    99  	expectations controller.ControllerExpectationsInterface
   100  
   101  	// finalizerExpectations tracks the Pod UIDs for which the controller
   102  	// expects to observe the tracking finalizer removed.
   103  	finalizerExpectations *uidTrackingExpectations
   104  
   105  	// A store of jobs
   106  	jobLister batchv1listers.JobLister
   107  
   108  	// A store of pods, populated by the podController
   109  	podStore corelisters.PodLister
   110  
   111  	// Jobs that need to be updated
   112  	queue workqueue.RateLimitingInterface
   113  
   114  	// Orphan deleted pods that still have a Job tracking finalizer to be removed
   115  	orphanQueue workqueue.RateLimitingInterface
   116  
   117  	broadcaster record.EventBroadcaster
   118  	recorder    record.EventRecorder
   119  
   120  	clock clock.WithTicker
   121  
   122  	// Store with information to compute the expotential backoff delay for pod
   123  	// recreation in case of pod failures.
   124  	podBackoffStore *backoffStore
   125  }
   126  
   127  type syncJobCtx struct {
   128  	job                             *batch.Job
   129  	pods                            []*v1.Pod
   130  	finishedCondition               *batch.JobCondition
   131  	activePods                      []*v1.Pod
   132  	succeeded                       int32
   133  	failed                          int32
   134  	prevSucceededIndexes            orderedIntervals
   135  	succeededIndexes                orderedIntervals
   136  	failedIndexes                   *orderedIntervals
   137  	newBackoffRecord                backoffRecord
   138  	expectedRmFinalizers            sets.Set[string]
   139  	uncounted                       *uncountedTerminatedPods
   140  	podsWithDelayedDeletionPerIndex map[int]*v1.Pod
   141  	terminating                     *int32
   142  }
   143  
   144  // NewController creates a new Job controller that keeps the relevant pods
   145  // in sync with their corresponding Job objects.
   146  func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) (*Controller, error) {
   147  	return newControllerWithClock(ctx, podInformer, jobInformer, kubeClient, &clock.RealClock{})
   148  }
   149  
   150  func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) (*Controller, error) {
   151  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
   152  	logger := klog.FromContext(ctx)
   153  
   154  	jm := &Controller{
   155  		kubeClient: kubeClient,
   156  		podControl: controller.RealPodControl{
   157  			KubeClient: kubeClient,
   158  			Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
   159  		},
   160  		expectations:          controller.NewControllerExpectations(),
   161  		finalizerExpectations: newUIDTrackingExpectations(),
   162  		queue:                 workqueue.NewRateLimitingQueueWithConfig(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.RateLimitingQueueConfig{Name: "job", Clock: clock}),
   163  		orphanQueue:           workqueue.NewRateLimitingQueueWithConfig(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.RateLimitingQueueConfig{Name: "job_orphan_pod", Clock: clock}),
   164  		broadcaster:           eventBroadcaster,
   165  		recorder:              eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
   166  		clock:                 clock,
   167  		podBackoffStore:       newBackoffStore(),
   168  	}
   169  
   170  	if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   171  		AddFunc: func(obj interface{}) {
   172  			jm.addJob(logger, obj)
   173  		},
   174  		UpdateFunc: func(oldObj, newObj interface{}) {
   175  			jm.updateJob(logger, oldObj, newObj)
   176  		},
   177  		DeleteFunc: func(obj interface{}) {
   178  			jm.deleteJob(logger, obj)
   179  		},
   180  	}); err != nil {
   181  		return nil, fmt.Errorf("adding Job event handler: %w", err)
   182  	}
   183  	jm.jobLister = jobInformer.Lister()
   184  	jm.jobStoreSynced = jobInformer.Informer().HasSynced
   185  
   186  	if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   187  		AddFunc: func(obj interface{}) {
   188  			jm.addPod(logger, obj)
   189  		},
   190  		UpdateFunc: func(oldObj, newObj interface{}) {
   191  			jm.updatePod(logger, oldObj, newObj)
   192  		},
   193  		DeleteFunc: func(obj interface{}) {
   194  			jm.deletePod(logger, obj, true)
   195  		},
   196  	}); err != nil {
   197  		return nil, fmt.Errorf("adding Pod event handler: %w", err)
   198  	}
   199  	jm.podStore = podInformer.Lister()
   200  	jm.podStoreSynced = podInformer.Informer().HasSynced
   201  
   202  	jm.updateStatusHandler = jm.updateJobStatus
   203  	jm.patchJobHandler = jm.patchJob
   204  	jm.syncHandler = jm.syncJob
   205  
   206  	metrics.Register()
   207  
   208  	return jm, nil
   209  }
   210  
   211  // Run the main goroutine responsible for watching and syncing jobs.
   212  func (jm *Controller) Run(ctx context.Context, workers int) {
   213  	defer utilruntime.HandleCrash()
   214  	logger := klog.FromContext(ctx)
   215  
   216  	// Start events processing pipeline.
   217  	jm.broadcaster.StartStructuredLogging(3)
   218  	jm.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")})
   219  	defer jm.broadcaster.Shutdown()
   220  
   221  	defer jm.queue.ShutDown()
   222  	defer jm.orphanQueue.ShutDown()
   223  
   224  	logger.Info("Starting job controller")
   225  	defer logger.Info("Shutting down job controller")
   226  
   227  	if !cache.WaitForNamedCacheSync("job", ctx.Done(), jm.podStoreSynced, jm.jobStoreSynced) {
   228  		return
   229  	}
   230  
   231  	for i := 0; i < workers; i++ {
   232  		go wait.UntilWithContext(ctx, jm.worker, time.Second)
   233  	}
   234  
   235  	go wait.UntilWithContext(ctx, jm.orphanWorker, time.Second)
   236  
   237  	<-ctx.Done()
   238  }
   239  
   240  // getPodJobs returns a list of Jobs that potentially match a Pod.
   241  func (jm *Controller) getPodJobs(pod *v1.Pod) []*batch.Job {
   242  	jobs, err := jm.jobLister.GetPodJobs(pod)
   243  	if err != nil {
   244  		return nil
   245  	}
   246  	if len(jobs) > 1 {
   247  		// ControllerRef will ensure we don't do anything crazy, but more than one
   248  		// item in this list nevertheless constitutes user error.
   249  		utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels))
   250  	}
   251  	ret := make([]*batch.Job, 0, len(jobs))
   252  	for i := range jobs {
   253  		ret = append(ret, &jobs[i])
   254  	}
   255  	return ret
   256  }
   257  
   258  // resolveControllerRef returns the controller referenced by a ControllerRef,
   259  // or nil if the ControllerRef could not be resolved to a matching controller
   260  // of the correct Kind.
   261  func (jm *Controller) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batch.Job {
   262  	// We can't look up by UID, so look up by Name and then verify UID.
   263  	// Don't even try to look up by Name if it's the wrong Kind.
   264  	if controllerRef.Kind != controllerKind.Kind {
   265  		return nil
   266  	}
   267  	job, err := jm.jobLister.Jobs(namespace).Get(controllerRef.Name)
   268  	if err != nil {
   269  		return nil
   270  	}
   271  	if job.UID != controllerRef.UID {
   272  		// The controller we found with this Name is not the same one that the
   273  		// ControllerRef points to.
   274  		return nil
   275  	}
   276  	return job
   277  }
   278  
   279  // When a pod is created, enqueue the controller that manages it and update its expectations.
   280  func (jm *Controller) addPod(logger klog.Logger, obj interface{}) {
   281  	pod := obj.(*v1.Pod)
   282  	recordFinishedPodWithTrackingFinalizer(nil, pod)
   283  	if pod.DeletionTimestamp != nil {
   284  		// on a restart of the controller, it's possible a new pod shows up in a state that
   285  		// is already pending deletion. Prevent the pod from being a creation observation.
   286  		jm.deletePod(logger, pod, false)
   287  		return
   288  	}
   289  
   290  	// If it has a ControllerRef, that's all that matters.
   291  	if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
   292  		job := jm.resolveControllerRef(pod.Namespace, controllerRef)
   293  		if job == nil {
   294  			return
   295  		}
   296  		jobKey, err := controller.KeyFunc(job)
   297  		if err != nil {
   298  			return
   299  		}
   300  		jm.expectations.CreationObserved(logger, jobKey)
   301  		jm.enqueueSyncJobBatched(logger, job)
   302  		return
   303  	}
   304  
   305  	// Otherwise, it's an orphan.
   306  	// Clean the finalizer.
   307  	if hasJobTrackingFinalizer(pod) {
   308  		jm.enqueueOrphanPod(pod)
   309  	}
   310  	// Get a list of all matching controllers and sync
   311  	// them to see if anyone wants to adopt it.
   312  	// DO NOT observe creation because no controller should be waiting for an
   313  	// orphan.
   314  	for _, job := range jm.getPodJobs(pod) {
   315  		jm.enqueueSyncJobBatched(logger, job)
   316  	}
   317  }
   318  
   319  // When a pod is updated, figure out what job/s manage it and wake them up.
   320  // If the labels of the pod have changed we need to awaken both the old
   321  // and new job. old and cur must be *v1.Pod types.
   322  func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
   323  	curPod := cur.(*v1.Pod)
   324  	oldPod := old.(*v1.Pod)
   325  	recordFinishedPodWithTrackingFinalizer(oldPod, curPod)
   326  	if curPod.ResourceVersion == oldPod.ResourceVersion {
   327  		// Periodic resync will send update events for all known pods.
   328  		// Two different versions of the same pod will always have different RVs.
   329  		return
   330  	}
   331  	if curPod.DeletionTimestamp != nil {
   332  		// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
   333  		// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
   334  		// for modification of the deletion timestamp and expect an job to create more pods asap, not wait
   335  		// until the kubelet actually deletes the pod.
   336  		jm.deletePod(logger, curPod, false)
   337  		return
   338  	}
   339  
   340  	// Don't check if oldPod has the finalizer, as during ownership transfer
   341  	// finalizers might be re-added and removed again in behalf of the new owner.
   342  	// If all those Pod updates collapse into a single event, the finalizer
   343  	// might be removed in oldPod and curPod. We want to record the latest
   344  	// state.
   345  	finalizerRemoved := !hasJobTrackingFinalizer(curPod)
   346  	curControllerRef := metav1.GetControllerOf(curPod)
   347  	oldControllerRef := metav1.GetControllerOf(oldPod)
   348  	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
   349  	if controllerRefChanged && oldControllerRef != nil {
   350  		// The ControllerRef was changed. Sync the old controller, if any.
   351  		if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
   352  			if finalizerRemoved {
   353  				key, err := controller.KeyFunc(job)
   354  				if err == nil {
   355  					jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
   356  				}
   357  			}
   358  			jm.enqueueSyncJobBatched(logger, job)
   359  		}
   360  	}
   361  
   362  	// If it has a ControllerRef, that's all that matters.
   363  	if curControllerRef != nil {
   364  		job := jm.resolveControllerRef(curPod.Namespace, curControllerRef)
   365  		if job == nil {
   366  			return
   367  		}
   368  		if finalizerRemoved {
   369  			key, err := controller.KeyFunc(job)
   370  			if err == nil {
   371  				jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
   372  			}
   373  		}
   374  		jm.enqueueSyncJobBatched(logger, job)
   375  		return
   376  	}
   377  
   378  	// Otherwise, it's an orphan.
   379  	// Clean the finalizer.
   380  	if hasJobTrackingFinalizer(curPod) {
   381  		jm.enqueueOrphanPod(curPod)
   382  	}
   383  	// If anything changed, sync matching controllers
   384  	// to see if anyone wants to adopt it now.
   385  	labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
   386  	if labelChanged || controllerRefChanged {
   387  		for _, job := range jm.getPodJobs(curPod) {
   388  			jm.enqueueSyncJobBatched(logger, job)
   389  		}
   390  	}
   391  }
   392  
   393  // When a pod is deleted, enqueue the job that manages the pod and update its expectations.
   394  // obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item.
   395  func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool) {
   396  	pod, ok := obj.(*v1.Pod)
   397  	if final {
   398  		recordFinishedPodWithTrackingFinalizer(pod, nil)
   399  	}
   400  
   401  	// When a delete is dropped, the relist will notice a pod in the store not
   402  	// in the list, leading to the insertion of a tombstone object which contains
   403  	// the deleted key/value. Note that this value might be stale. If the pod
   404  	// changed labels the new job will not be woken up till the periodic resync.
   405  	if !ok {
   406  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   407  		if !ok {
   408  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
   409  			return
   410  		}
   411  		pod, ok = tombstone.Obj.(*v1.Pod)
   412  		if !ok {
   413  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
   414  			return
   415  		}
   416  	}
   417  
   418  	controllerRef := metav1.GetControllerOf(pod)
   419  	hasFinalizer := hasJobTrackingFinalizer(pod)
   420  	if controllerRef == nil {
   421  		// No controller should care about orphans being deleted.
   422  		// But this pod might have belonged to a Job and the GC removed the reference.
   423  		if hasFinalizer {
   424  			jm.enqueueOrphanPod(pod)
   425  		}
   426  		return
   427  	}
   428  	job := jm.resolveControllerRef(pod.Namespace, controllerRef)
   429  	if job == nil || IsJobFinished(job) {
   430  		// syncJob will not remove this finalizer.
   431  		if hasFinalizer {
   432  			jm.enqueueOrphanPod(pod)
   433  		}
   434  		return
   435  	}
   436  	jobKey, err := controller.KeyFunc(job)
   437  	if err != nil {
   438  		return
   439  	}
   440  	jm.expectations.DeletionObserved(logger, jobKey)
   441  
   442  	// Consider the finalizer removed if this is the final delete. Otherwise,
   443  	// it's an update for the deletion timestamp, then check finalizer.
   444  	if final || !hasFinalizer {
   445  		jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
   446  	}
   447  
   448  	jm.enqueueSyncJobBatched(logger, job)
   449  }
   450  
   451  func (jm *Controller) addJob(logger klog.Logger, obj interface{}) {
   452  	jm.enqueueSyncJobImmediately(logger, obj)
   453  	jobObj, ok := obj.(*batch.Job)
   454  	if !ok {
   455  		return
   456  	}
   457  	if controllerName := managedByExternalController(jobObj); controllerName != nil {
   458  		metrics.JobByExternalControllerTotal.WithLabelValues(*controllerName).Inc()
   459  	}
   460  }
   461  
   462  func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
   463  	oldJob := old.(*batch.Job)
   464  	curJob := cur.(*batch.Job)
   465  
   466  	// never return error
   467  	key, err := controller.KeyFunc(curJob)
   468  	if err != nil {
   469  		return
   470  	}
   471  
   472  	if curJob.Generation == oldJob.Generation {
   473  		// Delay the Job sync when no generation change to batch Job status updates,
   474  		// typically triggered by pod events.
   475  		jm.enqueueSyncJobBatched(logger, curJob)
   476  	} else {
   477  		// Trigger immediate sync when spec is changed.
   478  		jm.enqueueSyncJobImmediately(logger, curJob)
   479  	}
   480  
   481  	// The job shouldn't be marked as finished until all pod finalizers are removed.
   482  	// This is a backup operation in this case.
   483  	if IsJobFinished(curJob) {
   484  		jm.cleanupPodFinalizers(curJob)
   485  	}
   486  
   487  	// check if need to add a new rsync for ActiveDeadlineSeconds
   488  	if curJob.Status.StartTime != nil {
   489  		curADS := curJob.Spec.ActiveDeadlineSeconds
   490  		if curADS == nil {
   491  			return
   492  		}
   493  		oldADS := oldJob.Spec.ActiveDeadlineSeconds
   494  		if oldADS == nil || *oldADS != *curADS {
   495  			passed := jm.clock.Since(curJob.Status.StartTime.Time)
   496  			total := time.Duration(*curADS) * time.Second
   497  			// AddAfter will handle total < passed
   498  			jm.queue.AddAfter(key, total-passed)
   499  			logger.V(4).Info("job's ActiveDeadlineSeconds updated, will rsync", "key", key, "interval", total-passed)
   500  		}
   501  	}
   502  }
   503  
   504  // deleteJob enqueues the job and all the pods associated with it that still
   505  // have a finalizer.
   506  func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
   507  	jm.enqueueSyncJobImmediately(logger, obj)
   508  	jobObj, ok := obj.(*batch.Job)
   509  	if !ok {
   510  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   511  		if !ok {
   512  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
   513  			return
   514  		}
   515  		jobObj, ok = tombstone.Obj.(*batch.Job)
   516  		if !ok {
   517  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a job %+v", obj))
   518  			return
   519  		}
   520  	}
   521  	jm.cleanupPodFinalizers(jobObj)
   522  }
   523  
   524  // enqueueSyncJobImmediately tells the Job controller to invoke syncJob
   525  // immediately.
   526  // It is only used for Job events (creation, deletion, spec update).
   527  // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
   528  func (jm *Controller) enqueueSyncJobImmediately(logger klog.Logger, obj interface{}) {
   529  	jm.enqueueSyncJobInternal(logger, obj, 0)
   530  }
   531  
   532  // enqueueSyncJobBatched tells the controller to invoke syncJob with a
   533  // constant batching delay.
   534  // It is used for:
   535  // - Pod events (creation, deletion, update)
   536  // - Job status update
   537  // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
   538  func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}) {
   539  	jm.enqueueSyncJobInternal(logger, obj, syncJobBatchPeriod)
   540  }
   541  
   542  // enqueueSyncJobWithDelay tells the controller to invoke syncJob with a
   543  // custom delay, but not smaller than the batching delay.
   544  // It is used when pod recreations are delayed due to pod failures.
   545  // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
   546  func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{}, delay time.Duration) {
   547  	if delay < syncJobBatchPeriod {
   548  		delay = syncJobBatchPeriod
   549  	}
   550  	jm.enqueueSyncJobInternal(logger, obj, delay)
   551  }
   552  
   553  func (jm *Controller) enqueueSyncJobInternal(logger klog.Logger, obj interface{}, delay time.Duration) {
   554  	key, err := controller.KeyFunc(obj)
   555  	if err != nil {
   556  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
   557  		return
   558  	}
   559  
   560  	// TODO: Handle overlapping controllers better. Either disallow them at admission time or
   561  	// deterministically avoid syncing controllers that fight over pods. Currently, we only
   562  	// ensure that the same controller is synced for a given pod. When we periodically relist
   563  	// all controllers there will still be some replica instability. One way to handle this is
   564  	// by querying the store for all controllers that this rc overlaps, as well as all
   565  	// controllers that overlap this rc, and sorting them.
   566  	logger.Info("enqueueing job", "key", key)
   567  	jm.queue.AddAfter(key, delay)
   568  }
   569  
   570  func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) {
   571  	key, err := controller.KeyFunc(obj)
   572  	if err != nil {
   573  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
   574  		return
   575  	}
   576  	jm.orphanQueue.Add(key)
   577  }
   578  
   579  // worker runs a worker thread that just dequeues items, processes them, and marks them done.
   580  // It enforces that the syncHandler is never invoked concurrently with the same key.
   581  func (jm *Controller) worker(ctx context.Context) {
   582  	for jm.processNextWorkItem(ctx) {
   583  	}
   584  }
   585  
   586  func (jm *Controller) processNextWorkItem(ctx context.Context) bool {
   587  	key, quit := jm.queue.Get()
   588  	if quit {
   589  		return false
   590  	}
   591  	defer jm.queue.Done(key)
   592  
   593  	err := jm.syncHandler(ctx, key.(string))
   594  	if err == nil {
   595  		jm.queue.Forget(key)
   596  		return true
   597  	}
   598  
   599  	utilruntime.HandleError(fmt.Errorf("syncing job: %w", err))
   600  	jm.queue.AddRateLimited(key)
   601  
   602  	return true
   603  }
   604  
   605  func (jm *Controller) orphanWorker(ctx context.Context) {
   606  	for jm.processNextOrphanPod(ctx) {
   607  	}
   608  }
   609  
   610  func (jm *Controller) processNextOrphanPod(ctx context.Context) bool {
   611  	key, quit := jm.orphanQueue.Get()
   612  	if quit {
   613  		return false
   614  	}
   615  	defer jm.orphanQueue.Done(key)
   616  	err := jm.syncOrphanPod(ctx, key.(string))
   617  	if err != nil {
   618  		utilruntime.HandleError(fmt.Errorf("Error syncing orphan pod: %v", err))
   619  		jm.orphanQueue.AddRateLimited(key)
   620  	} else {
   621  		jm.orphanQueue.Forget(key)
   622  	}
   623  
   624  	return true
   625  }
   626  
   627  // syncOrphanPod removes the tracking finalizer from an orphan pod if found.
   628  func (jm *Controller) syncOrphanPod(ctx context.Context, key string) error {
   629  	startTime := jm.clock.Now()
   630  	logger := klog.FromContext(ctx)
   631  	defer func() {
   632  		logger.V(4).Info("Finished syncing orphan pod", "pod", key, "elapsed", jm.clock.Since(startTime))
   633  	}()
   634  
   635  	ns, name, err := cache.SplitMetaNamespaceKey(key)
   636  	if err != nil {
   637  		return err
   638  	}
   639  
   640  	sharedPod, err := jm.podStore.Pods(ns).Get(name)
   641  	if err != nil {
   642  		if apierrors.IsNotFound(err) {
   643  			logger.V(4).Info("Orphan pod has been deleted", "pod", key)
   644  			return nil
   645  		}
   646  		return err
   647  	}
   648  	// Make sure the pod is still orphaned.
   649  	if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil {
   650  		job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef)
   651  		if job != nil {
   652  			// Skip cleanup of finalizers for pods owned by a job managed by an external controller
   653  			if controllerName := managedByExternalController(job); controllerName != nil {
   654  				logger.V(2).Info("Skip cleanup of the job finalizer for a pod owned by a job that is managed by an external controller", "key", key, "podUID", sharedPod.UID, "jobUID", job.UID, "controllerName", controllerName)
   655  				return nil
   656  			}
   657  		}
   658  		if job != nil && !IsJobFinished(job) {
   659  			// The pod was adopted. Do not remove finalizer.
   660  			return nil
   661  		}
   662  	}
   663  	if patch := removeTrackingFinalizerPatch(sharedPod); patch != nil {
   664  		if err := jm.podControl.PatchPod(ctx, ns, name, patch); err != nil && !apierrors.IsNotFound(err) {
   665  			return err
   666  		}
   667  	}
   668  	return nil
   669  }
   670  
   671  // getPodsForJob returns the set of pods that this Job should manage.
   672  // It also reconciles ControllerRef by adopting/orphaning, adding tracking
   673  // finalizers.
   674  // Note that the returned Pods are pointers into the cache.
   675  func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Pod, error) {
   676  	selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
   677  	if err != nil {
   678  		return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
   679  	}
   680  	// List all pods to include those that don't match the selector anymore
   681  	// but have a ControllerRef pointing to this controller.
   682  	pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything())
   683  	if err != nil {
   684  		return nil, err
   685  	}
   686  	// If any adoptions are attempted, we should first recheck for deletion
   687  	// with an uncached quorum read sometime after listing Pods (see #42639).
   688  	canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
   689  		fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
   690  		if err != nil {
   691  			return nil, err
   692  		}
   693  		if fresh.UID != j.UID {
   694  			return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", j.Namespace, j.Name, fresh.UID, j.UID)
   695  		}
   696  		return fresh, nil
   697  	})
   698  	cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, batch.JobTrackingFinalizer)
   699  	// When adopting Pods, this operation adds an ownerRef and finalizers.
   700  	pods, err = cm.ClaimPods(ctx, pods)
   701  	if err != nil {
   702  		return pods, err
   703  	}
   704  	// Set finalizer on adopted pods for the remaining calculations.
   705  	for i, p := range pods {
   706  		adopted := true
   707  		for _, r := range p.OwnerReferences {
   708  			if r.UID == j.UID {
   709  				adopted = false
   710  				break
   711  			}
   712  		}
   713  		if adopted && !hasJobTrackingFinalizer(p) {
   714  			pods[i] = p.DeepCopy()
   715  			pods[i].Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
   716  		}
   717  	}
   718  	return pods, err
   719  }
   720  
   721  // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
   722  // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
   723  // concurrently with the same key.
   724  func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
   725  	startTime := jm.clock.Now()
   726  	logger := klog.FromContext(ctx)
   727  	defer func() {
   728  		logger.V(4).Info("Finished syncing job", "key", key, "elapsed", jm.clock.Since(startTime))
   729  	}()
   730  
   731  	ns, name, err := cache.SplitMetaNamespaceKey(key)
   732  	if err != nil {
   733  		return err
   734  	}
   735  	if len(ns) == 0 || len(name) == 0 {
   736  		return fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
   737  	}
   738  	sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
   739  	if err != nil {
   740  		if apierrors.IsNotFound(err) {
   741  			logger.V(4).Info("Job has been deleted", "key", key)
   742  			jm.expectations.DeleteExpectations(logger, key)
   743  			jm.finalizerExpectations.deleteExpectations(logger, key)
   744  
   745  			err := jm.podBackoffStore.removeBackoffRecord(key)
   746  			if err != nil {
   747  				// re-syncing here as the record has to be removed for finished/deleted jobs
   748  				return fmt.Errorf("error removing backoff record %w", err)
   749  			}
   750  			return nil
   751  		}
   752  		return err
   753  	}
   754  
   755  	// Skip syncing of the job it is managed by another controller.
   756  	// We cannot rely solely on skipping of queueing such jobs for synchronization,
   757  	// because it is possible a synchronization task is queued for a job, without
   758  	// the managedBy field, but the job is quickly replaced by another job with
   759  	// the field. Then, the syncJob might be invoked for a job with the field.
   760  	if controllerName := managedByExternalController(sharedJob); controllerName != nil {
   761  		logger.V(2).Info("Skip syncing the job as it is managed by an external controller", "key", key, "uid", sharedJob.UID, "controllerName", controllerName)
   762  		return nil
   763  	}
   764  
   765  	// make a copy so we don't mutate the shared cache
   766  	job := *sharedJob.DeepCopy()
   767  
   768  	// if job was finished previously, we don't want to redo the termination
   769  	if IsJobFinished(&job) {
   770  		err := jm.podBackoffStore.removeBackoffRecord(key)
   771  		if err != nil {
   772  			// re-syncing here as the record has to be removed for finished/deleted jobs
   773  			return fmt.Errorf("error removing backoff record %w", err)
   774  		}
   775  		return nil
   776  	}
   777  
   778  	if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode != batch.NonIndexedCompletion && *job.Spec.CompletionMode != batch.IndexedCompletion {
   779  		jm.recorder.Event(&job, v1.EventTypeWarning, "UnknownCompletionMode", "Skipped Job sync because completion mode is unknown")
   780  		return nil
   781  	}
   782  
   783  	completionMode := getCompletionMode(&job)
   784  	action := metrics.JobSyncActionReconciling
   785  
   786  	defer func() {
   787  		result := "success"
   788  		if rErr != nil {
   789  			result = "error"
   790  		}
   791  
   792  		metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(jm.clock.Since(startTime).Seconds())
   793  		metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
   794  	}()
   795  
   796  	if job.Status.UncountedTerminatedPods == nil {
   797  		job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
   798  	}
   799  
   800  	// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
   801  	// and update the expectations after we've retrieved active pods from the store. If a new pod enters
   802  	// the store after we've checked the expectation, the job sync is just deferred till the next relist.
   803  	satisfiedExpectations := jm.expectations.SatisfiedExpectations(logger, key)
   804  
   805  	pods, err := jm.getPodsForJob(ctx, &job)
   806  	if err != nil {
   807  		return err
   808  	}
   809  	var terminating *int32
   810  	if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
   811  		terminating = ptr.To(controller.CountTerminatingPods(pods))
   812  	}
   813  	jobCtx := &syncJobCtx{
   814  		job:                  &job,
   815  		pods:                 pods,
   816  		activePods:           controller.FilterActivePods(logger, pods),
   817  		terminating:          terminating,
   818  		uncounted:            newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
   819  		expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
   820  	}
   821  	active := int32(len(jobCtx.activePods))
   822  	newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
   823  	jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
   824  	jobCtx.failed = job.Status.Failed + int32(nonIgnoredFailedPodsCount(jobCtx, newFailedPods)) + int32(len(jobCtx.uncounted.failed))
   825  	var ready *int32
   826  	if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
   827  		ready = ptr.To(countReadyPods(jobCtx.activePods))
   828  	}
   829  
   830  	// Job first start. Set StartTime only if the job is not in the suspended state.
   831  	if job.Status.StartTime == nil && !jobSuspended(&job) {
   832  		now := metav1.NewTime(jm.clock.Now())
   833  		job.Status.StartTime = &now
   834  	}
   835  
   836  	jobCtx.newBackoffRecord = jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
   837  
   838  	var manageJobErr error
   839  
   840  	exceedsBackoffLimit := jobCtx.failed > *job.Spec.BackoffLimit
   841  	jobCtx.finishedCondition = hasSuccessCriteriaMetCondition(&job)
   842  
   843  	// Given that the Job already has the SuccessCriteriaMet condition, the termination condition already had confirmed in another cycle.
   844  	// So, the job-controller evaluates the podFailurePolicy only when the Job doesn't have the SuccessCriteriaMet condition.
   845  	if jobCtx.finishedCondition == nil && feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
   846  		if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
   847  			jobCtx.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
   848  		} else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil {
   849  			// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
   850  			jobCtx.finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, batch.JobReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
   851  		}
   852  	}
   853  	if jobCtx.finishedCondition == nil {
   854  		if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
   855  			// check if the number of pod restart exceeds backoff (for restart OnFailure only)
   856  			// OR if the number of failed jobs increased since the last syncJob
   857  			jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonBackoffLimitExceeded, "Job has reached the specified backoff limit", jm.clock.Now())
   858  		} else if jm.pastActiveDeadline(&job) {
   859  			jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline", jm.clock.Now())
   860  		} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
   861  			syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
   862  			logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
   863  			jm.queue.AddAfter(key, syncDuration)
   864  		}
   865  	}
   866  
   867  	if isIndexedJob(&job) {
   868  		jobCtx.prevSucceededIndexes, jobCtx.succeededIndexes = calculateSucceededIndexes(logger, &job, pods)
   869  		jobCtx.succeeded = int32(jobCtx.succeededIndexes.total())
   870  		if hasBackoffLimitPerIndex(&job) {
   871  			jobCtx.failedIndexes = calculateFailedIndexes(logger, &job, pods)
   872  			if jobCtx.finishedCondition == nil {
   873  				if job.Spec.MaxFailedIndexes != nil && jobCtx.failedIndexes.total() > int(*job.Spec.MaxFailedIndexes) {
   874  					jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonMaxFailedIndexesExceeded, "Job has exceeded the specified maximal number of failed indexes", jm.clock.Now())
   875  				} else if jobCtx.failedIndexes.total() > 0 && jobCtx.failedIndexes.total()+jobCtx.succeededIndexes.total() >= int(*job.Spec.Completions) {
   876  					jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonFailedIndexes, "Job has failed indexes", jm.clock.Now())
   877  				}
   878  			}
   879  			jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx)
   880  		}
   881  		if jobCtx.finishedCondition == nil && hasSuccessCriteriaMetCondition(jobCtx.job) == nil {
   882  			if msg, met := matchSuccessPolicy(logger, job.Spec.SuccessPolicy, *job.Spec.Completions, jobCtx.succeededIndexes); met {
   883  				jobCtx.finishedCondition = newCondition(batch.JobSuccessCriteriaMet, v1.ConditionTrue, batch.JobReasonSuccessPolicy, msg, jm.clock.Now())
   884  			}
   885  		}
   886  	}
   887  	suspendCondChanged := false
   888  	// Remove active pods if Job failed.
   889  	if jobCtx.finishedCondition != nil {
   890  		deleted, err := jm.deleteActivePods(ctx, &job, jobCtx.activePods)
   891  		if deleted != active || !satisfiedExpectations {
   892  			// Can't declare the Job as finished yet, as there might be remaining
   893  			// pod finalizers or pods that are not in the informer's cache yet.
   894  			jobCtx.finishedCondition = nil
   895  		}
   896  		active -= deleted
   897  		manageJobErr = err
   898  	} else {
   899  		manageJobCalled := false
   900  		if satisfiedExpectations && job.DeletionTimestamp == nil {
   901  			active, action, manageJobErr = jm.manageJob(ctx, &job, jobCtx)
   902  			manageJobCalled = true
   903  		}
   904  		complete := false
   905  		if job.Spec.Completions == nil {
   906  			// This type of job is complete when any pod exits with success.
   907  			// Each pod is capable of
   908  			// determining whether or not the entire Job is done.  Subsequent pods are
   909  			// not expected to fail, but if they do, the failure is ignored.  Once any
   910  			// pod succeeds, the controller waits for remaining pods to finish, and
   911  			// then the job is complete.
   912  			complete = jobCtx.succeeded > 0 && active == 0
   913  		} else {
   914  			// Job specifies a number of completions.  This type of job signals
   915  			// success by having that number of successes.  Since we do not
   916  			// start more pods than there are remaining completions, there should
   917  			// not be any remaining active pods once this count is reached.
   918  			complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0
   919  		}
   920  		if complete {
   921  			jobCtx.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
   922  		} else if manageJobCalled {
   923  			// Update the conditions / emit events only if manageJob was called in
   924  			// this syncJob. Otherwise wait for the right syncJob call to make
   925  			// updates.
   926  			if job.Spec.Suspend != nil && *job.Spec.Suspend {
   927  				// Job can be in the suspended state only if it is NOT completed.
   928  				var isUpdated bool
   929  				job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", jm.clock.Now())
   930  				if isUpdated {
   931  					suspendCondChanged = true
   932  					jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
   933  				}
   934  			} else {
   935  				// Job not suspended.
   936  				var isUpdated bool
   937  				job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed", jm.clock.Now())
   938  				if isUpdated {
   939  					suspendCondChanged = true
   940  					jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
   941  					// Resumed jobs will always reset StartTime to current time. This is
   942  					// done because the ActiveDeadlineSeconds timer shouldn't go off
   943  					// whilst the Job is still suspended and resetting StartTime is
   944  					// consistent with resuming a Job created in the suspended state.
   945  					// (ActiveDeadlineSeconds is interpreted as the number of seconds a
   946  					// Job is continuously active.)
   947  					now := metav1.NewTime(jm.clock.Now())
   948  					job.Status.StartTime = &now
   949  				}
   950  			}
   951  		}
   952  	}
   953  
   954  	needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready)
   955  	needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating)
   956  	job.Status.Active = active
   957  	job.Status.Ready = ready
   958  	job.Status.Terminating = jobCtx.terminating
   959  	err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
   960  	if err != nil {
   961  		return fmt.Errorf("tracking status: %w", err)
   962  	}
   963  
   964  	return manageJobErr
   965  }
   966  
   967  // deleteActivePods issues deletion for active Pods, preserving finalizers.
   968  // This is done through DELETE calls that set deletion timestamps.
   969  // The method trackJobStatusAndRemoveFinalizers removes the finalizers, after
   970  // which the objects can actually be deleted.
   971  // Returns number of successfully deletions issued.
   972  func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, error) {
   973  	errCh := make(chan error, len(pods))
   974  	successfulDeletes := int32(len(pods))
   975  	wg := sync.WaitGroup{}
   976  	wg.Add(len(pods))
   977  	for i := range pods {
   978  		go func(pod *v1.Pod) {
   979  			defer wg.Done()
   980  			if err := jm.podControl.DeletePod(ctx, job.Namespace, pod.Name, job); err != nil && !apierrors.IsNotFound(err) {
   981  				atomic.AddInt32(&successfulDeletes, -1)
   982  				errCh <- err
   983  				utilruntime.HandleError(err)
   984  			}
   985  		}(pods[i])
   986  	}
   987  	wg.Wait()
   988  	return successfulDeletes, errorFromChannel(errCh)
   989  }
   990  
   991  func nonIgnoredFailedPodsCount(jobCtx *syncJobCtx, failedPods []*v1.Pod) int {
   992  	result := len(failedPods)
   993  	if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil {
   994  		for _, p := range failedPods {
   995  			_, countFailed, _ := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, p)
   996  			if !countFailed {
   997  				result--
   998  			}
   999  		}
  1000  	}
  1001  	return result
  1002  }
  1003  
  1004  // deleteJobPods deletes the pods, returns the number of successful removals
  1005  // and any error.
  1006  func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) {
  1007  	errCh := make(chan error, len(pods))
  1008  	successfulDeletes := int32(len(pods))
  1009  	logger := klog.FromContext(ctx)
  1010  
  1011  	failDelete := func(pod *v1.Pod, err error) {
  1012  		// Decrement the expected number of deletes because the informer won't observe this deletion
  1013  		jm.expectations.DeletionObserved(logger, jobKey)
  1014  		if !apierrors.IsNotFound(err) {
  1015  			logger.V(2).Info("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err)
  1016  			atomic.AddInt32(&successfulDeletes, -1)
  1017  			errCh <- err
  1018  			utilruntime.HandleError(err)
  1019  		}
  1020  	}
  1021  
  1022  	wg := sync.WaitGroup{}
  1023  	wg.Add(len(pods))
  1024  	for i := range pods {
  1025  		go func(pod *v1.Pod) {
  1026  			defer wg.Done()
  1027  			if patch := removeTrackingFinalizerPatch(pod); patch != nil {
  1028  				if err := jm.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patch); err != nil {
  1029  					failDelete(pod, fmt.Errorf("removing completion finalizer: %w", err))
  1030  					return
  1031  				}
  1032  			}
  1033  			if err := jm.podControl.DeletePod(ctx, job.Namespace, pod.Name, job); err != nil {
  1034  				failDelete(pod, err)
  1035  			}
  1036  		}(pods[i])
  1037  	}
  1038  	wg.Wait()
  1039  	return successfulDeletes, errorFromChannel(errCh)
  1040  }
  1041  
  1042  // trackJobStatusAndRemoveFinalizers does:
  1043  //  1. Add finished Pods to .status.uncountedTerminatedPods
  1044  //  2. Remove the finalizers from the Pods if they completed or were removed
  1045  //     or the job was removed.
  1046  //  3. Increment job counters for pods that no longer have a finalizer.
  1047  //  4. Add Complete condition if satisfied with current counters.
  1048  //
  1049  // It does this up to a limited number of Pods so that the size of .status
  1050  // doesn't grow too much and this sync doesn't starve other Jobs.
  1051  func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, needsFlush bool) error {
  1052  	logger := klog.FromContext(ctx)
  1053  
  1054  	isIndexed := isIndexedJob(jobCtx.job)
  1055  	var podsToRemoveFinalizer []*v1.Pod
  1056  	uncountedStatus := jobCtx.job.Status.UncountedTerminatedPods
  1057  	var newSucceededIndexes []int
  1058  	if isIndexed {
  1059  		// Sort to introduce completed Indexes in order.
  1060  		sort.Sort(byCompletionIndex(jobCtx.pods))
  1061  	}
  1062  	uidsWithFinalizer := make(sets.Set[string], len(jobCtx.pods))
  1063  	for _, p := range jobCtx.pods {
  1064  		uid := string(p.UID)
  1065  		if hasJobTrackingFinalizer(p) && !jobCtx.expectedRmFinalizers.Has(uid) {
  1066  			uidsWithFinalizer.Insert(uid)
  1067  		}
  1068  	}
  1069  
  1070  	// Shallow copy, as it will only be used to detect changes in the counters.
  1071  	oldCounters := jobCtx.job.Status
  1072  	if cleanUncountedPodsWithoutFinalizers(&jobCtx.job.Status, uidsWithFinalizer) {
  1073  		needsFlush = true
  1074  	}
  1075  	podFailureCountByPolicyAction := map[string]int{}
  1076  	reachedMaxUncountedPods := false
  1077  	for _, pod := range jobCtx.pods {
  1078  		if !hasJobTrackingFinalizer(pod) || jobCtx.expectedRmFinalizers.Has(string(pod.UID)) {
  1079  			// This pod was processed in a previous sync.
  1080  			continue
  1081  		}
  1082  		considerPodFailed := isPodFailed(pod, jobCtx.job)
  1083  		if !canRemoveFinalizer(logger, jobCtx, pod, considerPodFailed) {
  1084  			continue
  1085  		}
  1086  		podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
  1087  		if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(string(pod.UID)) {
  1088  			if isIndexed {
  1089  				// The completion index is enough to avoid recounting succeeded pods.
  1090  				// No need to track UIDs.
  1091  				ix := getCompletionIndex(pod.Annotations)
  1092  				if ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions) && !jobCtx.prevSucceededIndexes.has(ix) {
  1093  					newSucceededIndexes = append(newSucceededIndexes, ix)
  1094  					needsFlush = true
  1095  				}
  1096  			} else if !jobCtx.uncounted.succeeded.Has(string(pod.UID)) {
  1097  				needsFlush = true
  1098  				uncountedStatus.Succeeded = append(uncountedStatus.Succeeded, pod.UID)
  1099  			}
  1100  		} else if considerPodFailed || (jobCtx.finishedCondition != nil && !isSuccessCriteriaMetCondition(jobCtx.finishedCondition)) {
  1101  			// When the job is considered finished, every non-terminated pod is considered failed.
  1102  			ix := getCompletionIndex(pod.Annotations)
  1103  			if !jobCtx.uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) {
  1104  				if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil {
  1105  					_, countFailed, action := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, pod)
  1106  					if action != nil {
  1107  						podFailureCountByPolicyAction[string(*action)] += 1
  1108  					}
  1109  					if countFailed {
  1110  						needsFlush = true
  1111  						uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID)
  1112  					}
  1113  				} else {
  1114  					needsFlush = true
  1115  					uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID)
  1116  				}
  1117  			}
  1118  		}
  1119  		if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= MaxUncountedPods {
  1120  			// The controller added enough Pods already to .status.uncountedTerminatedPods
  1121  			// We stop counting pods and removing finalizers here to:
  1122  			// 1. Ensure that the UIDs representation are under 20 KB.
  1123  			// 2. Cap the number of finalizer removals so that syncing of big Jobs
  1124  			//    doesn't starve smaller ones.
  1125  			//
  1126  			// The job will be synced again because the Job status and Pod updates
  1127  			// will put the Job back to the work queue.
  1128  			reachedMaxUncountedPods = true
  1129  			break
  1130  		}
  1131  	}
  1132  	if isIndexed {
  1133  		jobCtx.succeededIndexes = jobCtx.succeededIndexes.withOrderedIndexes(newSucceededIndexes)
  1134  		succeededIndexesStr := jobCtx.succeededIndexes.String()
  1135  		if succeededIndexesStr != jobCtx.job.Status.CompletedIndexes {
  1136  			needsFlush = true
  1137  		}
  1138  		jobCtx.job.Status.Succeeded = int32(jobCtx.succeededIndexes.total())
  1139  		jobCtx.job.Status.CompletedIndexes = succeededIndexesStr
  1140  		var failedIndexesStr *string
  1141  		if jobCtx.failedIndexes != nil {
  1142  			failedIndexesStr = ptr.To(jobCtx.failedIndexes.String())
  1143  		}
  1144  		if !ptr.Equal(jobCtx.job.Status.FailedIndexes, failedIndexesStr) {
  1145  			jobCtx.job.Status.FailedIndexes = failedIndexesStr
  1146  			needsFlush = true
  1147  		}
  1148  	}
  1149  	if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
  1150  		if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget {
  1151  
  1152  			// Append the interim FailureTarget condition to update the job status with before finalizers are removed.
  1153  			jobCtx.job.Status.Conditions = append(jobCtx.job.Status.Conditions, *jobCtx.finishedCondition)
  1154  			needsFlush = true
  1155  
  1156  			// Prepare the final Failed condition to update the job status with after the finalizers are removed.
  1157  			// It is also used in the enactJobFinished function for reporting.
  1158  			jobCtx.finishedCondition = newFailedConditionForFailureTarget(jobCtx.finishedCondition, jm.clock.Now())
  1159  		}
  1160  	}
  1161  	if isSuccessCriteriaMetCondition(jobCtx.finishedCondition) {
  1162  		// Append the interim SuccessCriteriaMet condition to update the job status with before finalizers are removed.
  1163  		if hasSuccessCriteriaMetCondition(jobCtx.job) == nil {
  1164  			jobCtx.job.Status.Conditions = append(jobCtx.job.Status.Conditions, *jobCtx.finishedCondition)
  1165  			needsFlush = true
  1166  		}
  1167  
  1168  		// Prepare the final Complete condition to update the job status with after the finalizers are removed.
  1169  		// It is also used in the enactJobFinished function for reporting.
  1170  		jobCtx.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, jobCtx.finishedCondition.Reason, jobCtx.finishedCondition.Message, jm.clock.Now())
  1171  	}
  1172  	var err error
  1173  	if jobCtx.job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, jobCtx, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush); err != nil {
  1174  		return err
  1175  	}
  1176  	jobFinished := !reachedMaxUncountedPods && jm.enactJobFinished(jobCtx.job, jobCtx.finishedCondition)
  1177  	if jobFinished {
  1178  		needsFlush = true
  1179  	}
  1180  	if needsFlush {
  1181  		if _, err := jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
  1182  			return fmt.Errorf("removing uncounted pods from status: %w", err)
  1183  		}
  1184  		if jobFinished {
  1185  			jm.recordJobFinished(jobCtx.job, jobCtx.finishedCondition)
  1186  		}
  1187  		recordJobPodFinished(logger, jobCtx.job, oldCounters)
  1188  	}
  1189  	return nil
  1190  }
  1191  
  1192  // canRemoveFinalizer determines if the pod's finalizer can be safely removed.
  1193  // The finalizer can be removed when:
  1194  //   - the entire Job is terminating; or
  1195  //   - the pod's index is succeeded; or
  1196  //   - the Pod is considered failed, unless it's removal is delayed for the
  1197  //     purpose of transferring the JobIndexFailureCount annotations to the
  1198  //     replacement pod. the entire Job is terminating the finalizer can be
  1199  //     removed unconditionally; or
  1200  //   - the Job met successPolicy.
  1201  func canRemoveFinalizer(logger klog.Logger, jobCtx *syncJobCtx, pod *v1.Pod, considerPodFailed bool) bool {
  1202  	if jobCtx.job.DeletionTimestamp != nil || jobCtx.finishedCondition != nil || pod.Status.Phase == v1.PodSucceeded {
  1203  		return true
  1204  	}
  1205  	if !considerPodFailed {
  1206  		return false
  1207  	}
  1208  	if hasBackoffLimitPerIndex(jobCtx.job) {
  1209  		if index := getCompletionIndex(pod.Annotations); index != unknownCompletionIndex {
  1210  			if p, ok := jobCtx.podsWithDelayedDeletionPerIndex[index]; ok && p.UID == pod.UID {
  1211  				logger.V(3).Info("Delaying pod finalizer removal to await for pod recreation within the index", "pod", klog.KObj(pod))
  1212  				return false
  1213  			}
  1214  		}
  1215  	}
  1216  	return true
  1217  }
  1218  
  1219  // flushUncountedAndRemoveFinalizers does:
  1220  //  1. flush the Job status that might include new uncounted Pod UIDs.
  1221  //     Also flush the interim FailureTarget and SuccessCriteriaMet conditions if present.
  1222  //  2. perform the removal of finalizers from Pods which are in the uncounted
  1223  //     lists.
  1224  //  3. update the counters based on the Pods for which it successfully removed
  1225  //     the finalizers.
  1226  //  4. (if not all removals succeeded) flush Job status again.
  1227  //
  1228  // Returns whether there are pending changes in the Job status that need to be
  1229  // flushed in subsequent calls.
  1230  func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {
  1231  	logger := klog.FromContext(ctx)
  1232  	var err error
  1233  	if needsFlush {
  1234  		if jobCtx.job, err = jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
  1235  			return jobCtx.job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
  1236  		}
  1237  
  1238  		err = jm.podBackoffStore.updateBackoffRecord(jobCtx.newBackoffRecord)
  1239  
  1240  		if err != nil {
  1241  			// this error might undercount the backoff.
  1242  			// re-syncing from the current state might not help to recover
  1243  			// the backoff information
  1244  			logger.Error(err, "Backoff update failed")
  1245  		}
  1246  
  1247  		recordJobPodFinished(logger, jobCtx.job, *oldCounters)
  1248  		// Shallow copy, as it will only be used to detect changes in the counters.
  1249  		*oldCounters = jobCtx.job.Status
  1250  		needsFlush = false
  1251  	}
  1252  	recordJobPodFailurePolicyActions(jobCtx.job, podFailureCountByPolicyAction)
  1253  
  1254  	jobKey, err := controller.KeyFunc(jobCtx.job)
  1255  	if err != nil {
  1256  		return jobCtx.job, needsFlush, fmt.Errorf("getting job key: %w", err)
  1257  	}
  1258  	var rmErr error
  1259  	if len(podsToRemoveFinalizer) > 0 {
  1260  		var rmSucceded []bool
  1261  		rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(ctx, jobKey, podsToRemoveFinalizer)
  1262  		for i, p := range podsToRemoveFinalizer {
  1263  			if rmSucceded[i] {
  1264  				uidsWithFinalizer.Delete(string(p.UID))
  1265  			}
  1266  		}
  1267  	}
  1268  	// Failed to remove some finalizers. Attempt to update the status with the
  1269  	// partial progress.
  1270  	if cleanUncountedPodsWithoutFinalizers(&jobCtx.job.Status, uidsWithFinalizer) {
  1271  		needsFlush = true
  1272  	}
  1273  	if rmErr != nil && needsFlush {
  1274  		if job, err := jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
  1275  			return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err)
  1276  		}
  1277  	}
  1278  	return jobCtx.job, needsFlush, rmErr
  1279  }
  1280  
  1281  // cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from
  1282  // .status.uncountedTerminatedPods for which the finalizer was successfully
  1283  // removed and increments the corresponding status counters.
  1284  // Returns whether there was any status change.
  1285  func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[string]) bool {
  1286  	updated := false
  1287  	uncountedStatus := status.UncountedTerminatedPods
  1288  	newUncounted := filterInUncountedUIDs(uncountedStatus.Succeeded, uidsWithFinalizer)
  1289  	if len(newUncounted) != len(uncountedStatus.Succeeded) {
  1290  		updated = true
  1291  		status.Succeeded += int32(len(uncountedStatus.Succeeded) - len(newUncounted))
  1292  		uncountedStatus.Succeeded = newUncounted
  1293  	}
  1294  	newUncounted = filterInUncountedUIDs(uncountedStatus.Failed, uidsWithFinalizer)
  1295  	if len(newUncounted) != len(uncountedStatus.Failed) {
  1296  		updated = true
  1297  		status.Failed += int32(len(uncountedStatus.Failed) - len(newUncounted))
  1298  		uncountedStatus.Failed = newUncounted
  1299  	}
  1300  	return updated
  1301  }
  1302  
  1303  // removeTrackingFinalizerFromPods removes tracking finalizers from Pods and
  1304  // returns an array of booleans where the i-th value is true if the finalizer
  1305  // of the i-th Pod was successfully removed (if the pod was deleted when this
  1306  // function was called, it's considered as the finalizer was removed successfully).
  1307  func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKey string, pods []*v1.Pod) ([]bool, error) {
  1308  	logger := klog.FromContext(ctx)
  1309  	errCh := make(chan error, len(pods))
  1310  	succeeded := make([]bool, len(pods))
  1311  	uids := make([]string, len(pods))
  1312  	for i, p := range pods {
  1313  		uids[i] = string(p.UID)
  1314  	}
  1315  	if jobKey != "" {
  1316  		err := jm.finalizerExpectations.expectFinalizersRemoved(logger, jobKey, uids)
  1317  		if err != nil {
  1318  			return succeeded, fmt.Errorf("setting expected removed finalizers: %w", err)
  1319  		}
  1320  	}
  1321  	wg := sync.WaitGroup{}
  1322  	wg.Add(len(pods))
  1323  	for i := range pods {
  1324  		go func(i int) {
  1325  			pod := pods[i]
  1326  			defer wg.Done()
  1327  			if patch := removeTrackingFinalizerPatch(pod); patch != nil {
  1328  				if err := jm.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patch); err != nil {
  1329  					// In case of any failure, we don't expect a Pod update for the
  1330  					// finalizer removed. Clear expectation now.
  1331  					if jobKey != "" {
  1332  						jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
  1333  					}
  1334  					if !apierrors.IsNotFound(err) {
  1335  						errCh <- err
  1336  						utilruntime.HandleError(fmt.Errorf("removing tracking finalizer: %w", err))
  1337  						return
  1338  					}
  1339  				}
  1340  				succeeded[i] = true
  1341  			}
  1342  		}(i)
  1343  	}
  1344  	wg.Wait()
  1345  
  1346  	return succeeded, errorFromChannel(errCh)
  1347  }
  1348  
  1349  // enactJobFinished adds the Complete or Failed condition and records events.
  1350  // Returns whether the Job was considered finished.
  1351  func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool {
  1352  	if finishedCond == nil {
  1353  		return false
  1354  	}
  1355  	if uncounted := job.Status.UncountedTerminatedPods; uncounted != nil {
  1356  		if len(uncounted.Succeeded) > 0 || len(uncounted.Failed) > 0 {
  1357  			return false
  1358  		}
  1359  	}
  1360  	job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message, jm.clock.Now())
  1361  	if finishedCond.Type == batch.JobComplete {
  1362  		job.Status.CompletionTime = &finishedCond.LastTransitionTime
  1363  	}
  1364  	return true
  1365  }
  1366  
  1367  // recordJobFinished records events and the job_finished_total metric for a finished job.
  1368  func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool {
  1369  	completionMode := getCompletionMode(job)
  1370  	if finishedCond.Type == batch.JobComplete {
  1371  		if job.Spec.Completions != nil && job.Status.Succeeded > *job.Spec.Completions {
  1372  			jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
  1373  		}
  1374  		jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed")
  1375  		metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded", "").Inc()
  1376  	} else {
  1377  		jm.recorder.Event(job, v1.EventTypeWarning, finishedCond.Reason, finishedCond.Message)
  1378  		metrics.JobFinishedNum.WithLabelValues(completionMode, "failed", finishedCond.Reason).Inc()
  1379  	}
  1380  	return true
  1381  }
  1382  
  1383  func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[string]) []types.UID {
  1384  	var newUncounted []types.UID
  1385  	for _, uid := range uncounted {
  1386  		if include.Has(string(uid)) {
  1387  			newUncounted = append(newUncounted, uid)
  1388  		}
  1389  	}
  1390  	return newUncounted
  1391  }
  1392  
  1393  // newFailedConditionForFailureTarget creates a job Failed condition based on
  1394  // the interim FailureTarget condition.
  1395  func newFailedConditionForFailureTarget(condition *batch.JobCondition, now time.Time) *batch.JobCondition {
  1396  	return newCondition(batch.JobFailed, v1.ConditionTrue, condition.Reason, condition.Message, now)
  1397  }
  1398  
  1399  // pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit
  1400  // this method applies only to pods with restartPolicy == OnFailure
  1401  func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool {
  1402  	if job.Spec.Template.Spec.RestartPolicy != v1.RestartPolicyOnFailure {
  1403  		return false
  1404  	}
  1405  	result := int32(0)
  1406  	for i := range pods {
  1407  		po := pods[i]
  1408  		if po.Status.Phase == v1.PodRunning || po.Status.Phase == v1.PodPending {
  1409  			for j := range po.Status.InitContainerStatuses {
  1410  				stat := po.Status.InitContainerStatuses[j]
  1411  				result += stat.RestartCount
  1412  			}
  1413  			for j := range po.Status.ContainerStatuses {
  1414  				stat := po.Status.ContainerStatuses[j]
  1415  				result += stat.RestartCount
  1416  			}
  1417  		}
  1418  	}
  1419  	if *job.Spec.BackoffLimit == 0 {
  1420  		return result > 0
  1421  	}
  1422  	return result >= *job.Spec.BackoffLimit
  1423  }
  1424  
  1425  // pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if
  1426  // it is exceeded. If the job is currently suspended, the function will always
  1427  // return false.
  1428  func (jm *Controller) pastActiveDeadline(job *batch.Job) bool {
  1429  	if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil || jobSuspended(job) {
  1430  		return false
  1431  	}
  1432  	duration := jm.clock.Since(job.Status.StartTime.Time)
  1433  	allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
  1434  	return duration >= allowedDuration
  1435  }
  1436  
  1437  func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatus, reason, message string, now time.Time) *batch.JobCondition {
  1438  	return &batch.JobCondition{
  1439  		Type:               conditionType,
  1440  		Status:             status,
  1441  		LastProbeTime:      metav1.NewTime(now),
  1442  		LastTransitionTime: metav1.NewTime(now),
  1443  		Reason:             reason,
  1444  		Message:            message,
  1445  	}
  1446  }
  1447  
  1448  // getFailJobMessage returns a job failure message if the job should fail with the current counters
  1449  func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string {
  1450  	if !feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) || job.Spec.PodFailurePolicy == nil {
  1451  		return nil
  1452  	}
  1453  	for _, p := range pods {
  1454  		if isPodFailed(p, job) {
  1455  			jobFailureMessage, _, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
  1456  			if jobFailureMessage != nil {
  1457  				return jobFailureMessage
  1458  			}
  1459  		}
  1460  	}
  1461  	return nil
  1462  }
  1463  
  1464  // getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted
  1465  // in the job status. The list of failed pods can be affected by the podFailurePolicy.
  1466  func getNewFinishedPods(jobCtx *syncJobCtx) (succeededPods, failedPods []*v1.Pod) {
  1467  	succeededPods = getValidPodsWithFilter(jobCtx, jobCtx.uncounted.Succeeded(), func(p *v1.Pod) bool {
  1468  		return p.Status.Phase == v1.PodSucceeded
  1469  	})
  1470  	failedPods = getValidPodsWithFilter(jobCtx, jobCtx.uncounted.Failed(), func(p *v1.Pod) bool {
  1471  		return isPodFailed(p, jobCtx.job)
  1472  	})
  1473  	return succeededPods, failedPods
  1474  }
  1475  
  1476  // jobSuspended returns whether a Job is suspended while taking the feature
  1477  // gate into account.
  1478  func jobSuspended(job *batch.Job) bool {
  1479  	return job.Spec.Suspend != nil && *job.Spec.Suspend
  1480  }
  1481  
  1482  // manageJob is the core method responsible for managing the number of running
  1483  // pods according to what is specified in the job.Spec.
  1484  // Respects back-off; does not create new pods if the back-off time has not passed
  1485  // Does NOT modify <activePods>.
  1486  func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) {
  1487  	logger := klog.FromContext(ctx)
  1488  	active := int32(len(jobCtx.activePods))
  1489  	parallelism := *job.Spec.Parallelism
  1490  	jobKey, err := controller.KeyFunc(job)
  1491  	if err != nil {
  1492  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
  1493  		return 0, metrics.JobSyncActionTracking, nil
  1494  	}
  1495  
  1496  	if jobSuspended(job) {
  1497  		logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
  1498  		podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active))
  1499  		jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
  1500  		removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
  1501  		active -= removed
  1502  		return active, metrics.JobSyncActionPodsDeleted, err
  1503  	}
  1504  
  1505  	var terminating int32 = 0
  1506  	if onlyReplaceFailedPods(jobCtx.job) {
  1507  		// For PodFailurePolicy specified but PodReplacementPolicy disabled
  1508  		// we still need to count terminating pods for replica counts
  1509  		// But we will not allow updates to status.
  1510  		if jobCtx.terminating == nil {
  1511  			terminating = controller.CountTerminatingPods(jobCtx.pods)
  1512  		} else {
  1513  			terminating = *jobCtx.terminating
  1514  		}
  1515  	}
  1516  	wantActive := int32(0)
  1517  	if job.Spec.Completions == nil {
  1518  		// Job does not specify a number of completions.  Therefore, number active
  1519  		// should be equal to parallelism, unless the job has seen at least
  1520  		// once success, in which leave whatever is running, running.
  1521  		if jobCtx.succeeded > 0 {
  1522  			wantActive = active
  1523  		} else {
  1524  			wantActive = parallelism
  1525  		}
  1526  	} else {
  1527  		// Job specifies a specific number of completions.  Therefore, number
  1528  		// active should not ever exceed number of remaining completions.
  1529  		wantActive = *job.Spec.Completions - jobCtx.succeeded
  1530  		if wantActive > parallelism {
  1531  			wantActive = parallelism
  1532  		}
  1533  		if wantActive < 0 {
  1534  			wantActive = 0
  1535  		}
  1536  	}
  1537  
  1538  	rmAtLeast := active - wantActive
  1539  	if rmAtLeast < 0 {
  1540  		rmAtLeast = 0
  1541  	}
  1542  	podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(rmAtLeast))
  1543  	if len(podsToDelete) > MaxPodCreateDeletePerSync {
  1544  		podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
  1545  	}
  1546  	if len(podsToDelete) > 0 {
  1547  		jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
  1548  		logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
  1549  		removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
  1550  		active -= removed
  1551  		// While it is possible for a Job to require both pod creations and
  1552  		// deletions at the same time (e.g. indexed Jobs with repeated indexes), we
  1553  		// restrict ourselves to either just pod deletion or pod creation in any
  1554  		// given sync cycle. Of these two, pod deletion takes precedence.
  1555  		return active, metrics.JobSyncActionPodsDeleted, err
  1556  	}
  1557  
  1558  	if diff := wantActive - terminating - active; diff > 0 {
  1559  		var remainingTime time.Duration
  1560  		if !hasBackoffLimitPerIndex(job) {
  1561  			// we compute the global remaining time for pod creation when backoffLimitPerIndex is not used
  1562  			remainingTime = jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
  1563  		}
  1564  		if remainingTime > 0 {
  1565  			jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
  1566  			return 0, metrics.JobSyncActionPodsCreated, nil
  1567  		}
  1568  		if diff > int32(MaxPodCreateDeletePerSync) {
  1569  			diff = int32(MaxPodCreateDeletePerSync)
  1570  		}
  1571  
  1572  		var indexesToAdd []int
  1573  		if isIndexedJob(job) {
  1574  			indexesToAdd = firstPendingIndexes(jobCtx, int(diff), int(*job.Spec.Completions))
  1575  			if hasBackoffLimitPerIndex(job) {
  1576  				indexesToAdd, remainingTime = jm.getPodCreationInfoForIndependentIndexes(logger, indexesToAdd, jobCtx.podsWithDelayedDeletionPerIndex)
  1577  				if remainingTime > 0 {
  1578  					jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
  1579  					return 0, metrics.JobSyncActionPodsCreated, nil
  1580  				}
  1581  			}
  1582  			diff = int32(len(indexesToAdd))
  1583  		}
  1584  
  1585  		jm.expectations.ExpectCreations(logger, jobKey, int(diff))
  1586  		errCh := make(chan error, diff)
  1587  		logger.V(4).Info("Too few pods running", "key", jobKey, "need", wantActive, "creating", diff)
  1588  
  1589  		wait := sync.WaitGroup{}
  1590  
  1591  		active += diff
  1592  
  1593  		podTemplate := job.Spec.Template.DeepCopy()
  1594  		if isIndexedJob(job) {
  1595  			addCompletionIndexEnvVariables(podTemplate)
  1596  		}
  1597  		podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
  1598  
  1599  		// Counters for pod creation status (used by the job_pods_creation_total metric)
  1600  		var creationsSucceeded, creationsFailed int32 = 0, 0
  1601  
  1602  		// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
  1603  		// and double with each successful iteration in a kind of "slow start".
  1604  		// This handles attempts to start large numbers of pods that would
  1605  		// likely all fail with the same error. For example a project with a
  1606  		// low quota that attempts to create a large number of pods will be
  1607  		// prevented from spamming the API service with the pod create requests
  1608  		// after one of its pods fails.  Conveniently, this also prevents the
  1609  		// event spam that those failures would generate.
  1610  		for batchSize := min(diff, int32(controller.SlowStartInitialBatchSize)); diff > 0; batchSize = min(2*batchSize, diff) {
  1611  			errorCount := len(errCh)
  1612  			wait.Add(int(batchSize))
  1613  			for i := int32(0); i < batchSize; i++ {
  1614  				completionIndex := unknownCompletionIndex
  1615  				if len(indexesToAdd) > 0 {
  1616  					completionIndex = indexesToAdd[0]
  1617  					indexesToAdd = indexesToAdd[1:]
  1618  				}
  1619  				go func() {
  1620  					template := podTemplate
  1621  					generateName := ""
  1622  					if completionIndex != unknownCompletionIndex {
  1623  						template = podTemplate.DeepCopy()
  1624  						addCompletionIndexAnnotation(template, completionIndex)
  1625  
  1626  						if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
  1627  							addCompletionIndexLabel(template, completionIndex)
  1628  						}
  1629  						template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)
  1630  						generateName = podGenerateNameWithIndex(job.Name, completionIndex)
  1631  						if hasBackoffLimitPerIndex(job) {
  1632  							addIndexFailureCountAnnotation(logger, template, job, jobCtx.podsWithDelayedDeletionPerIndex[completionIndex])
  1633  						}
  1634  					}
  1635  					defer wait.Done()
  1636  					err := jm.podControl.CreatePodsWithGenerateName(ctx, job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)
  1637  					if err != nil {
  1638  						if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
  1639  							// If the namespace is being torn down, we can safely ignore
  1640  							// this error since all subsequent creations will fail.
  1641  							return
  1642  						}
  1643  					}
  1644  					if err != nil {
  1645  						defer utilruntime.HandleError(err)
  1646  						// Decrement the expected number of creates because the informer won't observe this pod
  1647  						logger.V(2).Info("Failed creation, decrementing expectations", "job", klog.KObj(job))
  1648  						jm.expectations.CreationObserved(logger, jobKey)
  1649  						atomic.AddInt32(&active, -1)
  1650  						errCh <- err
  1651  						atomic.AddInt32(&creationsFailed, 1)
  1652  					}
  1653  					atomic.AddInt32(&creationsSucceeded, 1)
  1654  				}()
  1655  			}
  1656  			wait.Wait()
  1657  			// any skipped pods that we never attempted to start shouldn't be expected.
  1658  			skippedPods := diff - batchSize
  1659  			if errorCount < len(errCh) && skippedPods > 0 {
  1660  				logger.V(2).Info("Slow-start failure. Skipping creating pods, decrementing expectations", "skippedCount", skippedPods, "job", klog.KObj(job))
  1661  				active -= skippedPods
  1662  				for i := int32(0); i < skippedPods; i++ {
  1663  					// Decrement the expected number of creates because the informer won't observe this pod
  1664  					jm.expectations.CreationObserved(logger, jobKey)
  1665  				}
  1666  				// The skipped pods will be retried later. The next controller resync will
  1667  				// retry the slow start process.
  1668  				break
  1669  			}
  1670  			diff -= batchSize
  1671  		}
  1672  		recordJobPodsCreationTotal(job, jobCtx, creationsSucceeded, creationsFailed)
  1673  		return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
  1674  	}
  1675  
  1676  	return active, metrics.JobSyncActionTracking, nil
  1677  }
  1678  
  1679  // getPodCreationInfoForIndependentIndexes returns a sub-list of all indexes
  1680  // to create that contains those which can be already created. In case no indexes
  1681  // are ready to create pods, it returns the lowest remaining time to create pods
  1682  // out of all indexes.
  1683  func (jm *Controller) getPodCreationInfoForIndependentIndexes(logger klog.Logger, indexesToAdd []int, podsWithDelayedDeletionPerIndex map[int]*v1.Pod) ([]int, time.Duration) {
  1684  	var indexesToAddNow []int
  1685  	var minRemainingTimePerIndex *time.Duration
  1686  	for _, indexToAdd := range indexesToAdd {
  1687  		if remainingTimePerIndex := getRemainingTimePerIndex(logger, jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff, podsWithDelayedDeletionPerIndex[indexToAdd]); remainingTimePerIndex == 0 {
  1688  			indexesToAddNow = append(indexesToAddNow, indexToAdd)
  1689  		} else if minRemainingTimePerIndex == nil || remainingTimePerIndex < *minRemainingTimePerIndex {
  1690  			minRemainingTimePerIndex = &remainingTimePerIndex
  1691  		}
  1692  	}
  1693  	if len(indexesToAddNow) > 0 {
  1694  		return indexesToAddNow, 0
  1695  	}
  1696  	return indexesToAddNow, ptr.Deref(minRemainingTimePerIndex, 0)
  1697  }
  1698  
  1699  // activePodsForRemoval returns Pods that should be removed because there
  1700  // are too many pods running or, if this is an indexed job, there are repeated
  1701  // indexes or invalid indexes or some pods don't have indexes.
  1702  // Sorts candidate pods in the order such that not-ready < ready, unscheduled
  1703  // < scheduled, and pending < running. This ensures that we delete pods
  1704  // in the earlier stages whenever possible.
  1705  func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.Pod {
  1706  	var rm, left []*v1.Pod
  1707  
  1708  	if isIndexedJob(job) {
  1709  		rm = make([]*v1.Pod, 0, rmAtLeast)
  1710  		left = make([]*v1.Pod, 0, len(pods)-rmAtLeast)
  1711  		rm, left = appendDuplicatedIndexPodsForRemoval(rm, left, pods, int(*job.Spec.Completions))
  1712  	} else {
  1713  		left = pods
  1714  	}
  1715  
  1716  	if len(rm) < rmAtLeast {
  1717  		sort.Sort(controller.ActivePods(left))
  1718  		rm = append(rm, left[:rmAtLeast-len(rm)]...)
  1719  	}
  1720  	return rm
  1721  }
  1722  
  1723  // updateJobStatus calls the API to update the job status.
  1724  func (jm *Controller) updateJobStatus(ctx context.Context, job *batch.Job) (*batch.Job, error) {
  1725  	return jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(ctx, job, metav1.UpdateOptions{})
  1726  }
  1727  
  1728  func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) error {
  1729  	_, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).Patch(
  1730  		ctx, job.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
  1731  	return err
  1732  }
  1733  
  1734  // getValidPodsWithFilter returns the valid pods that pass the filter.
  1735  // Pods are valid if they have a finalizer or in uncounted set
  1736  // and, for Indexed Jobs, a valid completion index.
  1737  func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
  1738  	var result []*v1.Pod
  1739  	for _, p := range jobCtx.pods {
  1740  		uid := string(p.UID)
  1741  
  1742  		// Pods that don't have a completion finalizer are in the uncounted set or
  1743  		// have already been accounted for in the Job status.
  1744  		if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || jobCtx.expectedRmFinalizers.Has(uid) {
  1745  			continue
  1746  		}
  1747  		if isIndexedJob(jobCtx.job) {
  1748  			idx := getCompletionIndex(p.Annotations)
  1749  			if idx == unknownCompletionIndex || idx >= int(*jobCtx.job.Spec.Completions) {
  1750  				continue
  1751  			}
  1752  		}
  1753  		if filter(p) {
  1754  			result = append(result, p)
  1755  		}
  1756  	}
  1757  	return result
  1758  }
  1759  
  1760  // getCompletionMode returns string representation of the completion mode. Used as a label value for metrics.
  1761  func getCompletionMode(job *batch.Job) string {
  1762  	if isIndexedJob(job) {
  1763  		return string(batch.IndexedCompletion)
  1764  	}
  1765  	return string(batch.NonIndexedCompletion)
  1766  }
  1767  
  1768  func appendJobCompletionFinalizerIfNotFound(finalizers []string) []string {
  1769  	for _, fin := range finalizers {
  1770  		if fin == batch.JobTrackingFinalizer {
  1771  			return finalizers
  1772  		}
  1773  	}
  1774  	return append(finalizers, batch.JobTrackingFinalizer)
  1775  }
  1776  
  1777  func removeTrackingFinalizerPatch(pod *v1.Pod) []byte {
  1778  	if !hasJobTrackingFinalizer(pod) {
  1779  		return nil
  1780  	}
  1781  	patch := map[string]interface{}{
  1782  		"metadata": map[string]interface{}{
  1783  			"$deleteFromPrimitiveList/finalizers": []string{batch.JobTrackingFinalizer},
  1784  		},
  1785  	}
  1786  	patchBytes, _ := json.Marshal(patch)
  1787  	return patchBytes
  1788  }
  1789  
  1790  type uncountedTerminatedPods struct {
  1791  	succeeded sets.Set[string]
  1792  	failed    sets.Set[string]
  1793  }
  1794  
  1795  func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerminatedPods {
  1796  	obj := uncountedTerminatedPods{
  1797  		succeeded: make(sets.Set[string], len(in.Succeeded)),
  1798  		failed:    make(sets.Set[string], len(in.Failed)),
  1799  	}
  1800  	for _, v := range in.Succeeded {
  1801  		obj.succeeded.Insert(string(v))
  1802  	}
  1803  	for _, v := range in.Failed {
  1804  		obj.failed.Insert(string(v))
  1805  	}
  1806  	return &obj
  1807  }
  1808  
  1809  func (u *uncountedTerminatedPods) Succeeded() sets.Set[string] {
  1810  	if u == nil {
  1811  		return nil
  1812  	}
  1813  	return u.succeeded
  1814  }
  1815  
  1816  func (u *uncountedTerminatedPods) Failed() sets.Set[string] {
  1817  	if u == nil {
  1818  		return nil
  1819  	}
  1820  	return u.failed
  1821  }
  1822  
  1823  func errorFromChannel(errCh <-chan error) error {
  1824  	select {
  1825  	case err := <-errCh:
  1826  		return err
  1827  	default:
  1828  	}
  1829  	return nil
  1830  }
  1831  
  1832  // ensureJobConditionStatus appends or updates an existing job condition of the
  1833  // given type with the given status value. Note that this function will not
  1834  // append to the conditions list if the new condition's status is false
  1835  // (because going from nothing to false is meaningless); it can, however,
  1836  // update the status condition to false. The function returns a bool to let the
  1837  // caller know if the list was changed (either appended or updated).
  1838  func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditionType, status v1.ConditionStatus, reason, message string, now time.Time) ([]batch.JobCondition, bool) {
  1839  	if condition := findConditionByType(list, cType); condition != nil {
  1840  		if condition.Status != status || condition.Reason != reason || condition.Message != message {
  1841  			*condition = *newCondition(cType, status, reason, message, now)
  1842  			return list, true
  1843  		}
  1844  		return list, false
  1845  	}
  1846  	// A condition with that type doesn't exist in the list.
  1847  	if status != v1.ConditionFalse {
  1848  		return append(list, *newCondition(cType, status, reason, message, now)), true
  1849  	}
  1850  	return list, false
  1851  }
  1852  
  1853  func isPodFailed(p *v1.Pod, job *batch.Job) bool {
  1854  	if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) && feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
  1855  		// When PodDisruptionConditions is enabled, orphan Pods and unschedulable
  1856  		// terminating Pods are marked as Failed. So we only need to check the phase.
  1857  		// TODO(#113855): Stop limiting this behavior to Jobs with podFailurePolicy.
  1858  		// For now, we do so to avoid affecting all running Jobs without the
  1859  		// availability to opt-out into the old behavior.
  1860  		return p.Status.Phase == v1.PodFailed
  1861  	}
  1862  	if p.Status.Phase == v1.PodFailed {
  1863  		return true
  1864  	}
  1865  	if onlyReplaceFailedPods(job) {
  1866  		return p.Status.Phase == v1.PodFailed
  1867  	}
  1868  	// Count deleted Pods as failures to account for orphan Pods that
  1869  	// never have a chance to reach the Failed phase.
  1870  	return p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded
  1871  }
  1872  
  1873  func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType) *batch.JobCondition {
  1874  	for i := range list {
  1875  		if list[i].Type == cType {
  1876  			return &list[i]
  1877  		}
  1878  	}
  1879  	return nil
  1880  }
  1881  
  1882  func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch.JobStatus) {
  1883  	completionMode := completionModeStr(job)
  1884  	var diff int
  1885  
  1886  	// Updating succeeded metric must be handled differently
  1887  	// for Indexed Jobs to handle the case where the job has
  1888  	// been scaled down by reducing completions & parallelism
  1889  	// in tandem, and now a previously completed index is
  1890  	// now out of range (i.e. index >= spec.Completions).
  1891  	if isIndexedJob(job) {
  1892  		completions := int(*job.Spec.Completions)
  1893  		if job.Status.CompletedIndexes != oldCounters.CompletedIndexes {
  1894  			diff = indexesCount(logger, &job.Status.CompletedIndexes, completions) - indexesCount(logger, &oldCounters.CompletedIndexes, completions)
  1895  		}
  1896  		backoffLimitLabel := backoffLimitMetricsLabel(job)
  1897  		metrics.JobFinishedIndexesTotal.WithLabelValues(metrics.Succeeded, backoffLimitLabel).Add(float64(diff))
  1898  		if hasBackoffLimitPerIndex(job) && job.Status.FailedIndexes != oldCounters.FailedIndexes {
  1899  			if failedDiff := indexesCount(logger, job.Status.FailedIndexes, completions) - indexesCount(logger, oldCounters.FailedIndexes, completions); failedDiff > 0 {
  1900  				metrics.JobFinishedIndexesTotal.WithLabelValues(metrics.Failed, backoffLimitLabel).Add(float64(failedDiff))
  1901  			}
  1902  		}
  1903  	} else {
  1904  		diff = int(job.Status.Succeeded) - int(oldCounters.Succeeded)
  1905  	}
  1906  	metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded).Add(float64(diff))
  1907  
  1908  	// Update failed metric.
  1909  	diff = int(job.Status.Failed - oldCounters.Failed)
  1910  	metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff))
  1911  }
  1912  
  1913  func indexesCount(logger klog.Logger, indexesStr *string, completions int) int {
  1914  	if indexesStr == nil {
  1915  		return 0
  1916  	}
  1917  	return parseIndexesFromString(logger, *indexesStr, completions).total()
  1918  }
  1919  
  1920  func backoffLimitMetricsLabel(job *batch.Job) string {
  1921  	if hasBackoffLimitPerIndex(job) {
  1922  		return "perIndex"
  1923  	}
  1924  	return "global"
  1925  }
  1926  
  1927  func recordJobPodFailurePolicyActions(job *batch.Job, podFailureCountByPolicyAction map[string]int) {
  1928  	for action, count := range podFailureCountByPolicyAction {
  1929  		metrics.PodFailuresHandledByFailurePolicy.WithLabelValues(action).Add(float64(count))
  1930  	}
  1931  }
  1932  
  1933  func countReadyPods(pods []*v1.Pod) int32 {
  1934  	cnt := int32(0)
  1935  	for _, p := range pods {
  1936  		if podutil.IsPodReady(p) {
  1937  			cnt++
  1938  		}
  1939  	}
  1940  	return cnt
  1941  }
  1942  
  1943  // This checks if we should apply PodReplacementPolicy.
  1944  // PodReplacementPolicy controls when we recreate pods if they are marked as terminating
  1945  // Failed means that we recreate only once the pod has terminated.
  1946  func onlyReplaceFailedPods(job *batch.Job) bool {
  1947  	// We check both PodReplacementPolicy for nil and failed
  1948  	// because it is possible that  `PodReplacementPolicy` is not defaulted,
  1949  	// when the `JobPodReplacementPolicy` feature gate is disabled for API server.
  1950  	if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) && job.Spec.PodReplacementPolicy != nil && *job.Spec.PodReplacementPolicy == batch.Failed {
  1951  		return true
  1952  	}
  1953  	return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
  1954  }
  1955  
  1956  func (jm *Controller) cleanupPodFinalizers(job *batch.Job) {
  1957  	// Listing pods shouldn't really fail, as we are just querying the informer cache.
  1958  	selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector)
  1959  	if err != nil {
  1960  		utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err))
  1961  		return
  1962  	}
  1963  	pods, _ := jm.podStore.Pods(job.Namespace).List(selector)
  1964  	for _, pod := range pods {
  1965  		if metav1.IsControlledBy(pod, job) && hasJobTrackingFinalizer(pod) {
  1966  			jm.enqueueOrphanPod(pod)
  1967  		}
  1968  	}
  1969  }
  1970  
  1971  func recordJobPodsCreationTotal(job *batch.Job, jobCtx *syncJobCtx, succeeded, failed int32) {
  1972  	reason := metrics.PodCreateNew
  1973  	if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
  1974  		if ptr.Deref(job.Spec.PodReplacementPolicy, batch.TerminatingOrFailed) == batch.Failed && jobCtx.failed > 0 {
  1975  			reason = metrics.PodRecreateFailed
  1976  		} else if jobCtx.failed > 0 || ptr.Deref(jobCtx.terminating, 0) > 0 {
  1977  			reason = metrics.PodRecreateTerminatingOrFailed
  1978  		}
  1979  	}
  1980  	if succeeded > 0 {
  1981  		metrics.JobPodsCreationTotal.WithLabelValues(reason, metrics.Succeeded).Add(float64(succeeded))
  1982  	}
  1983  	if failed > 0 {
  1984  		metrics.JobPodsCreationTotal.WithLabelValues(reason, metrics.Failed).Add(float64(failed))
  1985  	}
  1986  }
  1987  
  1988  func managedByExternalController(jobObj *batch.Job) *string {
  1989  	if feature.DefaultFeatureGate.Enabled(features.JobManagedBy) {
  1990  		if controllerName := jobObj.Spec.ManagedBy; controllerName != nil && *controllerName != batch.JobControllerName {
  1991  			return controllerName
  1992  		}
  1993  	}
  1994  	return nil
  1995  }
  1996  

View as plain text