/* Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package job import ( "fmt" "math" "sort" "strconv" "strings" batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/storage/names" "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/features" ) const ( completionIndexEnvName = "JOB_COMPLETION_INDEX" unknownCompletionIndex = -1 ) func isIndexedJob(job *batch.Job) bool { return job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion } func hasBackoffLimitPerIndex(job *batch.Job) bool { return feature.DefaultFeatureGate.Enabled(features.JobBackoffLimitPerIndex) && job.Spec.BackoffLimitPerIndex != nil } type interval struct { First int Last int } type orderedIntervals []interval // calculateSucceededIndexes returns the old and new list of succeeded indexes // in compressed format (intervals). // The old list is solely based off .status.completedIndexes, but returns an // empty list if this Job is not tracked with finalizers. The new list includes // the indexes that succeeded since the last sync. func calculateSucceededIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) { prevIntervals := parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) newSucceeded := sets.New[int]() for _, p := range pods { ix := getCompletionIndex(p.Annotations) // Succeeded Pod with valid index and, if tracking with finalizers, // has a finalizer (meaning that it is not counted yet). if p.Status.Phase == v1.PodSucceeded && ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && hasJobTrackingFinalizer(p) { newSucceeded.Insert(ix) } } // List returns the items of the set in order. result := prevIntervals.withOrderedIndexes(sets.List(newSucceeded)) return prevIntervals, result } // calculateFailedIndexes returns the list of failed indexes in compressed // format (intervals). The list includes indexes already present in // .status.failedIndexes and indexes that failed since the last sync. func calculateFailedIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) *orderedIntervals { var prevIntervals orderedIntervals if job.Status.FailedIndexes != nil { prevIntervals = parseIndexesFromString(logger, *job.Status.FailedIndexes, int(*job.Spec.Completions)) } newFailed := sets.New[int]() for _, p := range pods { ix := getCompletionIndex(p.Annotations) // Failed Pod with valid index and has a finalizer (meaning that it is not counted yet). if ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && hasJobTrackingFinalizer(p) && isIndexFailed(logger, job, p) { newFailed.Insert(ix) } } // List returns the items of the set in order. result := prevIntervals.withOrderedIndexes(sets.List(newFailed)) return &result } func isIndexFailed(logger klog.Logger, job *batch.Job, pod *v1.Pod) bool { isPodFailedCounted := false if isPodFailed(pod, job) { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { _, countFailed, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod) if action != nil && *action == batch.PodFailurePolicyActionFailIndex { return true } isPodFailedCounted = countFailed } else { isPodFailedCounted = true } } return isPodFailedCounted && getIndexFailureCount(logger, pod) >= *job.Spec.BackoffLimitPerIndex } // withOrderedIndexes returns a new list of ordered intervals that contains // the newIndexes, provided in increasing order. func (oi orderedIntervals) withOrderedIndexes(newIndexes []int) orderedIntervals { newIndexIntervals := make(orderedIntervals, len(newIndexes)) for i, newIndex := range newIndexes { newIndexIntervals[i] = interval{newIndex, newIndex} } return oi.merge(newIndexIntervals) } // with returns a new list of ordered intervals that contains the newOrderedIntervals. func (oi orderedIntervals) merge(newOi orderedIntervals) orderedIntervals { var result orderedIntervals i := 0 j := 0 var lastInterval *interval appendOrMergeWithLastInterval := func(thisInterval interval) { if lastInterval == nil || thisInterval.First > lastInterval.Last+1 { result = append(result, thisInterval) lastInterval = &result[len(result)-1] } else if lastInterval.Last < thisInterval.Last { lastInterval.Last = thisInterval.Last } } for i < len(oi) && j < len(newOi) { if oi[i].First < newOi[j].First { appendOrMergeWithLastInterval(oi[i]) i++ } else { appendOrMergeWithLastInterval(newOi[j]) j++ } } for i < len(oi) { appendOrMergeWithLastInterval(oi[i]) i++ } for j < len(newOi) { appendOrMergeWithLastInterval(newOi[j]) j++ } return result } // total returns number of indexes contained in the intervals. func (oi orderedIntervals) total() int { var count int for _, iv := range oi { count += iv.Last - iv.First + 1 } return count } func (oi orderedIntervals) String() string { var builder strings.Builder for _, v := range oi { if builder.Len() > 0 { builder.WriteRune(',') } builder.WriteString(strconv.Itoa(v.First)) if v.Last > v.First { if v.Last == v.First+1 { builder.WriteRune(',') } else { builder.WriteRune('-') } builder.WriteString(strconv.Itoa(v.Last)) } } return builder.String() } func (oi orderedIntervals) has(ix int) bool { lo := 0 hi := len(oi) // Invariant: oi[hi].Last >= ix for hi > lo { mid := lo + (hi-lo)/2 if oi[mid].Last >= ix { hi = mid } else { lo = mid + 1 } } if hi == len(oi) { return false } return oi[hi].First <= ix } func parseIndexesFromString(logger klog.Logger, indexesStr string, completions int) orderedIntervals { if indexesStr == "" { return nil } var result orderedIntervals var lastInterval *interval for _, intervalStr := range strings.Split(indexesStr, ",") { limitsStr := strings.Split(intervalStr, "-") var inter interval var err error inter.First, err = strconv.Atoi(limitsStr[0]) if err != nil { logger.Info("Corrupted indexes interval, ignoring", "interval", intervalStr, "err", err) continue } if inter.First >= completions { break } if len(limitsStr) > 1 { inter.Last, err = strconv.Atoi(limitsStr[1]) if err != nil { logger.Info("Corrupted indexes interval, ignoring", "interval", intervalStr, "err", err) continue } if inter.Last >= completions { inter.Last = completions - 1 } } else { inter.Last = inter.First } if lastInterval != nil && lastInterval.Last == inter.First-1 { lastInterval.Last = inter.Last } else { result = append(result, inter) lastInterval = &result[len(result)-1] } } return result } // firstPendingIndexes returns `count` indexes less than `completions` that are // not covered by `activePods`, `succeededIndexes` or `failedIndexes`. // In cases of PodReplacementPolicy as Failed we will include `terminatingPods` in this list. func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int { if count == 0 { return nil } active := getIndexes(jobCtx.activePods) result := make([]int, 0, count) nonPending := jobCtx.succeededIndexes.withOrderedIndexes(sets.List(active)) if onlyReplaceFailedPods(jobCtx.job) { terminating := getIndexes(controller.FilterTerminatingPods(jobCtx.pods)) nonPending = nonPending.withOrderedIndexes(sets.List(terminating)) } if jobCtx.failedIndexes != nil { nonPending = nonPending.merge(*jobCtx.failedIndexes) } // The following algorithm is bounded by len(nonPending) and count. candidate := 0 for _, sInterval := range nonPending { for ; candidate < completions && len(result) < count && candidate < sInterval.First; candidate++ { result = append(result, candidate) } if candidate < sInterval.Last+1 { candidate = sInterval.Last + 1 } } for ; candidate < completions && len(result) < count; candidate++ { result = append(result, candidate) } return result } // Returns the list of indexes corresponding to the set of pods func getIndexes(pods []*v1.Pod) sets.Set[int] { result := sets.New[int]() for _, p := range pods { ix := getCompletionIndex(p.Annotations) if ix != unknownCompletionIndex { result.Insert(ix) } } return result } // appendDuplicatedIndexPodsForRemoval scans active `pods` for duplicated // completion indexes. For each index, it selects n-1 pods for removal, where n // is the number of repetitions. The pods to be removed are appended to `rm`, // while the remaining pods are appended to `left`. // All pods that don't have a completion index are appended to `rm`. // All pods with index not in valid range are appended to `rm`. func appendDuplicatedIndexPodsForRemoval(rm, left, pods []*v1.Pod, completions int) ([]*v1.Pod, []*v1.Pod) { sort.Sort(byCompletionIndex(pods)) lastIndex := unknownCompletionIndex firstRepeatPos := 0 countLooped := 0 for i, p := range pods { ix := getCompletionIndex(p.Annotations) if ix >= completions { rm = append(rm, pods[i:]...) break } if ix != lastIndex { rm, left = appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:i], lastIndex) firstRepeatPos = i lastIndex = ix } countLooped += 1 } return appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:countLooped], lastIndex) } // getPodsWithDelayedDeletionPerIndex returns the pod which removal is delayed // in order to await for recreation. This map is used when BackoffLimitPerIndex // is enabled to delay pod finalizer removal, and thus pod deletion, until the // replacement pod is created. The pod deletion is delayed so that the // replacement pod can have the batch.kubernetes.io/job-index-failure-count // annotation set properly keeping track of the number of failed pods within // the index. func getPodsWithDelayedDeletionPerIndex(logger klog.Logger, jobCtx *syncJobCtx) map[int]*v1.Pod { // the failed pods corresponding to currently active indexes can be safely // deleted as the failure count annotation is present in the currently // active pods. activeIndexes := getIndexes(jobCtx.activePods) podsWithDelayedDeletionPerIndex := make(map[int]*v1.Pod) getValidPodsWithFilter(jobCtx, nil, func(p *v1.Pod) bool { if isPodFailed(p, jobCtx.job) { if ix := getCompletionIndex(p.Annotations); ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions) { if jobCtx.succeededIndexes.has(ix) || jobCtx.failedIndexes.has(ix) || activeIndexes.Has(ix) { return false } if lastPodWithDelayedDeletion, ok := podsWithDelayedDeletionPerIndex[ix]; ok { if getIndexAbsoluteFailureCount(logger, lastPodWithDelayedDeletion) <= getIndexAbsoluteFailureCount(logger, p) && !getFinishedTime(p).Before(getFinishedTime(lastPodWithDelayedDeletion)) { podsWithDelayedDeletionPerIndex[ix] = p } } else { podsWithDelayedDeletionPerIndex[ix] = p } } } return false }) return podsWithDelayedDeletionPerIndex } func addIndexFailureCountAnnotation(logger klog.Logger, template *v1.PodTemplateSpec, job *batch.Job, podBeingReplaced *v1.Pod) { indexFailureCount, indexIgnoredFailureCount := getNewIndexFailureCounts(logger, job, podBeingReplaced) template.Annotations[batch.JobIndexFailureCountAnnotation] = strconv.Itoa(int(indexFailureCount)) if indexIgnoredFailureCount > 0 { template.Annotations[batch.JobIndexIgnoredFailureCountAnnotation] = strconv.Itoa(int(indexIgnoredFailureCount)) } } // getNewIndexFailureCount returns the value of the index-failure-count // annotation for the new pod being created func getNewIndexFailureCounts(logger klog.Logger, job *batch.Job, podBeingReplaced *v1.Pod) (int32, int32) { if podBeingReplaced != nil { indexFailureCount := parseIndexFailureCountAnnotation(logger, podBeingReplaced) indexIgnoredFailureCount := parseIndexFailureIgnoreCountAnnotation(logger, podBeingReplaced) if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { _, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, podBeingReplaced) if countFailed { indexFailureCount++ } else { indexIgnoredFailureCount++ } } else { indexFailureCount++ } return indexFailureCount, indexIgnoredFailureCount } return 0, 0 } func appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods []*v1.Pod, ix int) ([]*v1.Pod, []*v1.Pod) { if ix == unknownCompletionIndex { rm = append(rm, pods...) return rm, left } if len(pods) == 1 { left = append(left, pods[0]) return rm, left } sort.Sort(controller.ActivePods(pods)) rm = append(rm, pods[:len(pods)-1]...) left = append(left, pods[len(pods)-1]) return rm, left } func getCompletionIndex(annotations map[string]string) int { if annotations == nil { return unknownCompletionIndex } v, ok := annotations[batch.JobCompletionIndexAnnotation] if !ok { return unknownCompletionIndex } i, err := strconv.Atoi(v) if err != nil { return unknownCompletionIndex } if i < 0 { return unknownCompletionIndex } return i } // getIndexFailureCount returns the value of the batch.kubernetes.io/job-index-failure-count // annotation as int32. It fallbacks to 0 when: // - there is no annotation - for example the pod was created when the BackoffLimitPerIndex // feature was temporarily disabled, or the annotation was manually removed by the user, // - the value of the annotation isn't parsable as int - for example because // it was set by a malicious user, // - the value of the annotation is negative or greater by int32 - for example // because it was set by a malicious user. func getIndexFailureCount(logger klog.Logger, pod *v1.Pod) int32 { return parseIndexFailureCountAnnotation(logger, pod) } func getIndexAbsoluteFailureCount(logger klog.Logger, pod *v1.Pod) int32 { return parseIndexFailureCountAnnotation(logger, pod) + parseIndexFailureIgnoreCountAnnotation(logger, pod) } func parseIndexFailureCountAnnotation(logger klog.Logger, pod *v1.Pod) int32 { if value, ok := pod.Annotations[batch.JobIndexFailureCountAnnotation]; ok { return parseInt32(logger, value) } logger.V(3).Info("There is no expected annotation", "annotationKey", batch.JobIndexFailureCountAnnotation, "pod", klog.KObj(pod), "podUID", pod.UID) return 0 } func parseIndexFailureIgnoreCountAnnotation(logger klog.Logger, pod *v1.Pod) int32 { if value, ok := pod.Annotations[batch.JobIndexIgnoredFailureCountAnnotation]; ok { return parseInt32(logger, value) } return 0 } func parseInt32(logger klog.Logger, vStr string) int32 { if vInt, err := strconv.Atoi(vStr); err != nil { logger.Error(err, "Failed to parse the value", "value", vStr) return 0 } else if vInt < 0 || vInt > math.MaxInt32 { logger.Info("The value is invalid", "value", vInt) return 0 } else { return int32(vInt) } } func addCompletionIndexEnvVariables(template *v1.PodTemplateSpec) { for i := range template.Spec.InitContainers { addCompletionIndexEnvVariable(&template.Spec.InitContainers[i]) } for i := range template.Spec.Containers { addCompletionIndexEnvVariable(&template.Spec.Containers[i]) } } func addCompletionIndexEnvVariable(container *v1.Container) { for _, v := range container.Env { if v.Name == completionIndexEnvName { return } } var fieldPath string if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) { fieldPath = fmt.Sprintf("metadata.labels['%s']", batch.JobCompletionIndexAnnotation) } else { fieldPath = fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation) } container.Env = append(container.Env, v1.EnvVar{ Name: completionIndexEnvName, ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{ FieldPath: fieldPath, }, }, }) } func addCompletionIndexAnnotation(template *v1.PodTemplateSpec, index int) { if template.Annotations == nil { template.Annotations = make(map[string]string, 1) } template.Annotations[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index) } func addCompletionIndexLabel(template *v1.PodTemplateSpec, index int) { if template.Labels == nil { template.Labels = make(map[string]string, 1) } // For consistency, we use the annotation batch.kubernetes.io/job-completion-index for the corresponding label as well. template.Labels[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index) } func podGenerateNameWithIndex(jobName string, index int) string { appendIndex := "-" + strconv.Itoa(index) + "-" generateNamePrefix := jobName + appendIndex if len(generateNamePrefix) > names.MaxGeneratedNameLength { generateNamePrefix = generateNamePrefix[:names.MaxGeneratedNameLength-len(appendIndex)] + appendIndex } return generateNamePrefix } type byCompletionIndex []*v1.Pod func (bci byCompletionIndex) Less(i, j int) bool { return getCompletionIndex(bci[i].Annotations) < getCompletionIndex(bci[j].Annotations) } func (bci byCompletionIndex) Swap(i, j int) { bci[i], bci[j] = bci[j], bci[i] } func (bci byCompletionIndex) Len() int { return len(bci) } func completionModeStr(job *batch.Job) string { if job.Spec.CompletionMode != nil { return string(*job.Spec.CompletionMode) } return string(batch.NonIndexedCompletion) }