1
16
17 package job
18
19 import (
20 "fmt"
21 "math"
22 "sort"
23 "strconv"
24 "strings"
25
26 batch "k8s.io/api/batch/v1"
27 v1 "k8s.io/api/core/v1"
28 "k8s.io/apimachinery/pkg/util/sets"
29 "k8s.io/apiserver/pkg/storage/names"
30 "k8s.io/apiserver/pkg/util/feature"
31 "k8s.io/klog/v2"
32 "k8s.io/kubernetes/pkg/controller"
33 "k8s.io/kubernetes/pkg/features"
34 )
35
36 const (
37 completionIndexEnvName = "JOB_COMPLETION_INDEX"
38 unknownCompletionIndex = -1
39 )
40
41 func isIndexedJob(job *batch.Job) bool {
42 return job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion
43 }
44
45 func hasBackoffLimitPerIndex(job *batch.Job) bool {
46 return feature.DefaultFeatureGate.Enabled(features.JobBackoffLimitPerIndex) && job.Spec.BackoffLimitPerIndex != nil
47 }
48
49 type interval struct {
50 First int
51 Last int
52 }
53
54 type orderedIntervals []interval
55
56
57
58
59
60
61 func calculateSucceededIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) {
62 prevIntervals := parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
63 newSucceeded := sets.New[int]()
64 for _, p := range pods {
65 ix := getCompletionIndex(p.Annotations)
66
67
68 if p.Status.Phase == v1.PodSucceeded && ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && hasJobTrackingFinalizer(p) {
69 newSucceeded.Insert(ix)
70 }
71 }
72
73 result := prevIntervals.withOrderedIndexes(sets.List(newSucceeded))
74 return prevIntervals, result
75 }
76
77
78
79
80 func calculateFailedIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) *orderedIntervals {
81 var prevIntervals orderedIntervals
82 if job.Status.FailedIndexes != nil {
83 prevIntervals = parseIndexesFromString(logger, *job.Status.FailedIndexes, int(*job.Spec.Completions))
84 }
85 newFailed := sets.New[int]()
86 for _, p := range pods {
87 ix := getCompletionIndex(p.Annotations)
88
89 if ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && hasJobTrackingFinalizer(p) && isIndexFailed(logger, job, p) {
90 newFailed.Insert(ix)
91 }
92 }
93
94 result := prevIntervals.withOrderedIndexes(sets.List(newFailed))
95 return &result
96 }
97
98 func isIndexFailed(logger klog.Logger, job *batch.Job, pod *v1.Pod) bool {
99 isPodFailedCounted := false
100 if isPodFailed(pod, job) {
101 if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
102 _, countFailed, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod)
103 if action != nil && *action == batch.PodFailurePolicyActionFailIndex {
104 return true
105 }
106 isPodFailedCounted = countFailed
107 } else {
108 isPodFailedCounted = true
109 }
110 }
111 return isPodFailedCounted && getIndexFailureCount(logger, pod) >= *job.Spec.BackoffLimitPerIndex
112 }
113
114
115
116 func (oi orderedIntervals) withOrderedIndexes(newIndexes []int) orderedIntervals {
117 newIndexIntervals := make(orderedIntervals, len(newIndexes))
118 for i, newIndex := range newIndexes {
119 newIndexIntervals[i] = interval{newIndex, newIndex}
120 }
121 return oi.merge(newIndexIntervals)
122 }
123
124
125 func (oi orderedIntervals) merge(newOi orderedIntervals) orderedIntervals {
126 var result orderedIntervals
127 i := 0
128 j := 0
129 var lastInterval *interval
130 appendOrMergeWithLastInterval := func(thisInterval interval) {
131 if lastInterval == nil || thisInterval.First > lastInterval.Last+1 {
132 result = append(result, thisInterval)
133 lastInterval = &result[len(result)-1]
134 } else if lastInterval.Last < thisInterval.Last {
135 lastInterval.Last = thisInterval.Last
136 }
137 }
138 for i < len(oi) && j < len(newOi) {
139 if oi[i].First < newOi[j].First {
140 appendOrMergeWithLastInterval(oi[i])
141 i++
142 } else {
143 appendOrMergeWithLastInterval(newOi[j])
144 j++
145 }
146 }
147 for i < len(oi) {
148 appendOrMergeWithLastInterval(oi[i])
149 i++
150 }
151 for j < len(newOi) {
152 appendOrMergeWithLastInterval(newOi[j])
153 j++
154 }
155 return result
156 }
157
158
159 func (oi orderedIntervals) total() int {
160 var count int
161 for _, iv := range oi {
162 count += iv.Last - iv.First + 1
163 }
164 return count
165 }
166
167 func (oi orderedIntervals) String() string {
168 var builder strings.Builder
169 for _, v := range oi {
170 if builder.Len() > 0 {
171 builder.WriteRune(',')
172 }
173 builder.WriteString(strconv.Itoa(v.First))
174 if v.Last > v.First {
175 if v.Last == v.First+1 {
176 builder.WriteRune(',')
177 } else {
178 builder.WriteRune('-')
179 }
180 builder.WriteString(strconv.Itoa(v.Last))
181 }
182 }
183 return builder.String()
184 }
185
186 func (oi orderedIntervals) has(ix int) bool {
187 lo := 0
188 hi := len(oi)
189
190 for hi > lo {
191 mid := lo + (hi-lo)/2
192 if oi[mid].Last >= ix {
193 hi = mid
194 } else {
195 lo = mid + 1
196 }
197 }
198 if hi == len(oi) {
199 return false
200 }
201 return oi[hi].First <= ix
202 }
203
204 func parseIndexesFromString(logger klog.Logger, indexesStr string, completions int) orderedIntervals {
205 if indexesStr == "" {
206 return nil
207 }
208 var result orderedIntervals
209 var lastInterval *interval
210 for _, intervalStr := range strings.Split(indexesStr, ",") {
211 limitsStr := strings.Split(intervalStr, "-")
212 var inter interval
213 var err error
214 inter.First, err = strconv.Atoi(limitsStr[0])
215 if err != nil {
216 logger.Info("Corrupted indexes interval, ignoring", "interval", intervalStr, "err", err)
217 continue
218 }
219 if inter.First >= completions {
220 break
221 }
222 if len(limitsStr) > 1 {
223 inter.Last, err = strconv.Atoi(limitsStr[1])
224 if err != nil {
225 logger.Info("Corrupted indexes interval, ignoring", "interval", intervalStr, "err", err)
226 continue
227 }
228 if inter.Last >= completions {
229 inter.Last = completions - 1
230 }
231 } else {
232 inter.Last = inter.First
233 }
234 if lastInterval != nil && lastInterval.Last == inter.First-1 {
235 lastInterval.Last = inter.Last
236 } else {
237 result = append(result, inter)
238 lastInterval = &result[len(result)-1]
239 }
240 }
241 return result
242 }
243
244
245
246
247 func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int {
248 if count == 0 {
249 return nil
250 }
251 active := getIndexes(jobCtx.activePods)
252 result := make([]int, 0, count)
253 nonPending := jobCtx.succeededIndexes.withOrderedIndexes(sets.List(active))
254 if onlyReplaceFailedPods(jobCtx.job) {
255 terminating := getIndexes(controller.FilterTerminatingPods(jobCtx.pods))
256 nonPending = nonPending.withOrderedIndexes(sets.List(terminating))
257 }
258 if jobCtx.failedIndexes != nil {
259 nonPending = nonPending.merge(*jobCtx.failedIndexes)
260 }
261
262 candidate := 0
263 for _, sInterval := range nonPending {
264 for ; candidate < completions && len(result) < count && candidate < sInterval.First; candidate++ {
265 result = append(result, candidate)
266 }
267 if candidate < sInterval.Last+1 {
268 candidate = sInterval.Last + 1
269 }
270 }
271 for ; candidate < completions && len(result) < count; candidate++ {
272 result = append(result, candidate)
273 }
274 return result
275 }
276
277
278 func getIndexes(pods []*v1.Pod) sets.Set[int] {
279 result := sets.New[int]()
280 for _, p := range pods {
281 ix := getCompletionIndex(p.Annotations)
282 if ix != unknownCompletionIndex {
283 result.Insert(ix)
284 }
285 }
286 return result
287 }
288
289
290
291
292
293
294
295 func appendDuplicatedIndexPodsForRemoval(rm, left, pods []*v1.Pod, completions int) ([]*v1.Pod, []*v1.Pod) {
296 sort.Sort(byCompletionIndex(pods))
297 lastIndex := unknownCompletionIndex
298 firstRepeatPos := 0
299 countLooped := 0
300 for i, p := range pods {
301 ix := getCompletionIndex(p.Annotations)
302 if ix >= completions {
303 rm = append(rm, pods[i:]...)
304 break
305 }
306 if ix != lastIndex {
307 rm, left = appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:i], lastIndex)
308 firstRepeatPos = i
309 lastIndex = ix
310 }
311 countLooped += 1
312 }
313 return appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:countLooped], lastIndex)
314 }
315
316
317
318
319
320
321
322
323 func getPodsWithDelayedDeletionPerIndex(logger klog.Logger, jobCtx *syncJobCtx) map[int]*v1.Pod {
324
325
326
327 activeIndexes := getIndexes(jobCtx.activePods)
328
329 podsWithDelayedDeletionPerIndex := make(map[int]*v1.Pod)
330 getValidPodsWithFilter(jobCtx, nil, func(p *v1.Pod) bool {
331 if isPodFailed(p, jobCtx.job) {
332 if ix := getCompletionIndex(p.Annotations); ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions) {
333 if jobCtx.succeededIndexes.has(ix) || jobCtx.failedIndexes.has(ix) || activeIndexes.Has(ix) {
334 return false
335 }
336 if lastPodWithDelayedDeletion, ok := podsWithDelayedDeletionPerIndex[ix]; ok {
337 if getIndexAbsoluteFailureCount(logger, lastPodWithDelayedDeletion) <= getIndexAbsoluteFailureCount(logger, p) && !getFinishedTime(p).Before(getFinishedTime(lastPodWithDelayedDeletion)) {
338 podsWithDelayedDeletionPerIndex[ix] = p
339 }
340 } else {
341 podsWithDelayedDeletionPerIndex[ix] = p
342 }
343 }
344 }
345 return false
346 })
347 return podsWithDelayedDeletionPerIndex
348 }
349
350 func addIndexFailureCountAnnotation(logger klog.Logger, template *v1.PodTemplateSpec, job *batch.Job, podBeingReplaced *v1.Pod) {
351 indexFailureCount, indexIgnoredFailureCount := getNewIndexFailureCounts(logger, job, podBeingReplaced)
352 template.Annotations[batch.JobIndexFailureCountAnnotation] = strconv.Itoa(int(indexFailureCount))
353 if indexIgnoredFailureCount > 0 {
354 template.Annotations[batch.JobIndexIgnoredFailureCountAnnotation] = strconv.Itoa(int(indexIgnoredFailureCount))
355 }
356 }
357
358
359
360 func getNewIndexFailureCounts(logger klog.Logger, job *batch.Job, podBeingReplaced *v1.Pod) (int32, int32) {
361 if podBeingReplaced != nil {
362 indexFailureCount := parseIndexFailureCountAnnotation(logger, podBeingReplaced)
363 indexIgnoredFailureCount := parseIndexFailureIgnoreCountAnnotation(logger, podBeingReplaced)
364 if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
365 _, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, podBeingReplaced)
366 if countFailed {
367 indexFailureCount++
368 } else {
369 indexIgnoredFailureCount++
370 }
371 } else {
372 indexFailureCount++
373 }
374 return indexFailureCount, indexIgnoredFailureCount
375 }
376 return 0, 0
377 }
378
379 func appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods []*v1.Pod, ix int) ([]*v1.Pod, []*v1.Pod) {
380 if ix == unknownCompletionIndex {
381 rm = append(rm, pods...)
382 return rm, left
383 }
384 if len(pods) == 1 {
385 left = append(left, pods[0])
386 return rm, left
387 }
388 sort.Sort(controller.ActivePods(pods))
389 rm = append(rm, pods[:len(pods)-1]...)
390 left = append(left, pods[len(pods)-1])
391 return rm, left
392 }
393
394 func getCompletionIndex(annotations map[string]string) int {
395 if annotations == nil {
396 return unknownCompletionIndex
397 }
398 v, ok := annotations[batch.JobCompletionIndexAnnotation]
399 if !ok {
400 return unknownCompletionIndex
401 }
402 i, err := strconv.Atoi(v)
403 if err != nil {
404 return unknownCompletionIndex
405 }
406 if i < 0 {
407 return unknownCompletionIndex
408 }
409 return i
410 }
411
412
413
414
415
416
417
418
419
420 func getIndexFailureCount(logger klog.Logger, pod *v1.Pod) int32 {
421 return parseIndexFailureCountAnnotation(logger, pod)
422 }
423
424 func getIndexAbsoluteFailureCount(logger klog.Logger, pod *v1.Pod) int32 {
425 return parseIndexFailureCountAnnotation(logger, pod) + parseIndexFailureIgnoreCountAnnotation(logger, pod)
426 }
427
428 func parseIndexFailureCountAnnotation(logger klog.Logger, pod *v1.Pod) int32 {
429 if value, ok := pod.Annotations[batch.JobIndexFailureCountAnnotation]; ok {
430 return parseInt32(logger, value)
431 }
432 logger.V(3).Info("There is no expected annotation", "annotationKey", batch.JobIndexFailureCountAnnotation, "pod", klog.KObj(pod), "podUID", pod.UID)
433 return 0
434 }
435
436 func parseIndexFailureIgnoreCountAnnotation(logger klog.Logger, pod *v1.Pod) int32 {
437 if value, ok := pod.Annotations[batch.JobIndexIgnoredFailureCountAnnotation]; ok {
438 return parseInt32(logger, value)
439 }
440 return 0
441 }
442
443 func parseInt32(logger klog.Logger, vStr string) int32 {
444 if vInt, err := strconv.Atoi(vStr); err != nil {
445 logger.Error(err, "Failed to parse the value", "value", vStr)
446 return 0
447 } else if vInt < 0 || vInt > math.MaxInt32 {
448 logger.Info("The value is invalid", "value", vInt)
449 return 0
450 } else {
451 return int32(vInt)
452 }
453 }
454
455 func addCompletionIndexEnvVariables(template *v1.PodTemplateSpec) {
456 for i := range template.Spec.InitContainers {
457 addCompletionIndexEnvVariable(&template.Spec.InitContainers[i])
458 }
459 for i := range template.Spec.Containers {
460 addCompletionIndexEnvVariable(&template.Spec.Containers[i])
461 }
462 }
463
464 func addCompletionIndexEnvVariable(container *v1.Container) {
465 for _, v := range container.Env {
466 if v.Name == completionIndexEnvName {
467 return
468 }
469 }
470 var fieldPath string
471 if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
472 fieldPath = fmt.Sprintf("metadata.labels['%s']", batch.JobCompletionIndexAnnotation)
473 } else {
474 fieldPath = fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation)
475 }
476 container.Env = append(container.Env, v1.EnvVar{
477 Name: completionIndexEnvName,
478 ValueFrom: &v1.EnvVarSource{
479 FieldRef: &v1.ObjectFieldSelector{
480 FieldPath: fieldPath,
481 },
482 },
483 })
484 }
485
486 func addCompletionIndexAnnotation(template *v1.PodTemplateSpec, index int) {
487 if template.Annotations == nil {
488 template.Annotations = make(map[string]string, 1)
489 }
490 template.Annotations[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index)
491 }
492
493 func addCompletionIndexLabel(template *v1.PodTemplateSpec, index int) {
494 if template.Labels == nil {
495 template.Labels = make(map[string]string, 1)
496 }
497
498 template.Labels[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index)
499 }
500
501 func podGenerateNameWithIndex(jobName string, index int) string {
502 appendIndex := "-" + strconv.Itoa(index) + "-"
503 generateNamePrefix := jobName + appendIndex
504 if len(generateNamePrefix) > names.MaxGeneratedNameLength {
505 generateNamePrefix = generateNamePrefix[:names.MaxGeneratedNameLength-len(appendIndex)] + appendIndex
506 }
507 return generateNamePrefix
508 }
509
510 type byCompletionIndex []*v1.Pod
511
512 func (bci byCompletionIndex) Less(i, j int) bool {
513 return getCompletionIndex(bci[i].Annotations) < getCompletionIndex(bci[j].Annotations)
514 }
515
516 func (bci byCompletionIndex) Swap(i, j int) {
517 bci[i], bci[j] = bci[j], bci[i]
518 }
519
520 func (bci byCompletionIndex) Len() int {
521 return len(bci)
522 }
523
524 func completionModeStr(job *batch.Job) string {
525 if job.Spec.CompletionMode != nil {
526 return string(*job.Spec.CompletionMode)
527 }
528 return string(batch.NonIndexedCompletion)
529 }
530
View as plain text