...

Source file src/k8s.io/kubernetes/pkg/controller/job/indexed_job_utils.go

Documentation: k8s.io/kubernetes/pkg/controller/job

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package job
    18  
    19  import (
    20  	"fmt"
    21  	"math"
    22  	"sort"
    23  	"strconv"
    24  	"strings"
    25  
    26  	batch "k8s.io/api/batch/v1"
    27  	v1 "k8s.io/api/core/v1"
    28  	"k8s.io/apimachinery/pkg/util/sets"
    29  	"k8s.io/apiserver/pkg/storage/names"
    30  	"k8s.io/apiserver/pkg/util/feature"
    31  	"k8s.io/klog/v2"
    32  	"k8s.io/kubernetes/pkg/controller"
    33  	"k8s.io/kubernetes/pkg/features"
    34  )
    35  
    36  const (
    37  	completionIndexEnvName = "JOB_COMPLETION_INDEX"
    38  	unknownCompletionIndex = -1
    39  )
    40  
    41  func isIndexedJob(job *batch.Job) bool {
    42  	return job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion
    43  }
    44  
    45  func hasBackoffLimitPerIndex(job *batch.Job) bool {
    46  	return feature.DefaultFeatureGate.Enabled(features.JobBackoffLimitPerIndex) && job.Spec.BackoffLimitPerIndex != nil
    47  }
    48  
    49  type interval struct {
    50  	First int
    51  	Last  int
    52  }
    53  
    54  type orderedIntervals []interval
    55  
    56  // calculateSucceededIndexes returns the old and new list of succeeded indexes
    57  // in compressed format (intervals).
    58  // The old list is solely based off .status.completedIndexes, but returns an
    59  // empty list if this Job is not tracked with finalizers. The new list includes
    60  // the indexes that succeeded since the last sync.
    61  func calculateSucceededIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) {
    62  	prevIntervals := parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
    63  	newSucceeded := sets.New[int]()
    64  	for _, p := range pods {
    65  		ix := getCompletionIndex(p.Annotations)
    66  		// Succeeded Pod with valid index and, if tracking with finalizers,
    67  		// has a finalizer (meaning that it is not counted yet).
    68  		if p.Status.Phase == v1.PodSucceeded && ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && hasJobTrackingFinalizer(p) {
    69  			newSucceeded.Insert(ix)
    70  		}
    71  	}
    72  	// List returns the items of the set in order.
    73  	result := prevIntervals.withOrderedIndexes(sets.List(newSucceeded))
    74  	return prevIntervals, result
    75  }
    76  
    77  // calculateFailedIndexes returns the list of failed indexes in compressed
    78  // format (intervals). The list includes indexes already present in
    79  // .status.failedIndexes and indexes that failed since the last sync.
    80  func calculateFailedIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) *orderedIntervals {
    81  	var prevIntervals orderedIntervals
    82  	if job.Status.FailedIndexes != nil {
    83  		prevIntervals = parseIndexesFromString(logger, *job.Status.FailedIndexes, int(*job.Spec.Completions))
    84  	}
    85  	newFailed := sets.New[int]()
    86  	for _, p := range pods {
    87  		ix := getCompletionIndex(p.Annotations)
    88  		// Failed Pod with valid index and has a finalizer (meaning that it is not counted yet).
    89  		if ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && hasJobTrackingFinalizer(p) && isIndexFailed(logger, job, p) {
    90  			newFailed.Insert(ix)
    91  		}
    92  	}
    93  	// List returns the items of the set in order.
    94  	result := prevIntervals.withOrderedIndexes(sets.List(newFailed))
    95  	return &result
    96  }
    97  
    98  func isIndexFailed(logger klog.Logger, job *batch.Job, pod *v1.Pod) bool {
    99  	isPodFailedCounted := false
   100  	if isPodFailed(pod, job) {
   101  		if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
   102  			_, countFailed, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod)
   103  			if action != nil && *action == batch.PodFailurePolicyActionFailIndex {
   104  				return true
   105  			}
   106  			isPodFailedCounted = countFailed
   107  		} else {
   108  			isPodFailedCounted = true
   109  		}
   110  	}
   111  	return isPodFailedCounted && getIndexFailureCount(logger, pod) >= *job.Spec.BackoffLimitPerIndex
   112  }
   113  
   114  // withOrderedIndexes returns a new list of ordered intervals that contains
   115  // the newIndexes, provided in increasing order.
   116  func (oi orderedIntervals) withOrderedIndexes(newIndexes []int) orderedIntervals {
   117  	newIndexIntervals := make(orderedIntervals, len(newIndexes))
   118  	for i, newIndex := range newIndexes {
   119  		newIndexIntervals[i] = interval{newIndex, newIndex}
   120  	}
   121  	return oi.merge(newIndexIntervals)
   122  }
   123  
   124  // with returns a new list of ordered intervals that contains the newOrderedIntervals.
   125  func (oi orderedIntervals) merge(newOi orderedIntervals) orderedIntervals {
   126  	var result orderedIntervals
   127  	i := 0
   128  	j := 0
   129  	var lastInterval *interval
   130  	appendOrMergeWithLastInterval := func(thisInterval interval) {
   131  		if lastInterval == nil || thisInterval.First > lastInterval.Last+1 {
   132  			result = append(result, thisInterval)
   133  			lastInterval = &result[len(result)-1]
   134  		} else if lastInterval.Last < thisInterval.Last {
   135  			lastInterval.Last = thisInterval.Last
   136  		}
   137  	}
   138  	for i < len(oi) && j < len(newOi) {
   139  		if oi[i].First < newOi[j].First {
   140  			appendOrMergeWithLastInterval(oi[i])
   141  			i++
   142  		} else {
   143  			appendOrMergeWithLastInterval(newOi[j])
   144  			j++
   145  		}
   146  	}
   147  	for i < len(oi) {
   148  		appendOrMergeWithLastInterval(oi[i])
   149  		i++
   150  	}
   151  	for j < len(newOi) {
   152  		appendOrMergeWithLastInterval(newOi[j])
   153  		j++
   154  	}
   155  	return result
   156  }
   157  
   158  // total returns number of indexes contained in the intervals.
   159  func (oi orderedIntervals) total() int {
   160  	var count int
   161  	for _, iv := range oi {
   162  		count += iv.Last - iv.First + 1
   163  	}
   164  	return count
   165  }
   166  
   167  func (oi orderedIntervals) String() string {
   168  	var builder strings.Builder
   169  	for _, v := range oi {
   170  		if builder.Len() > 0 {
   171  			builder.WriteRune(',')
   172  		}
   173  		builder.WriteString(strconv.Itoa(v.First))
   174  		if v.Last > v.First {
   175  			if v.Last == v.First+1 {
   176  				builder.WriteRune(',')
   177  			} else {
   178  				builder.WriteRune('-')
   179  			}
   180  			builder.WriteString(strconv.Itoa(v.Last))
   181  		}
   182  	}
   183  	return builder.String()
   184  }
   185  
   186  func (oi orderedIntervals) has(ix int) bool {
   187  	lo := 0
   188  	hi := len(oi)
   189  	// Invariant: oi[hi].Last >= ix
   190  	for hi > lo {
   191  		mid := lo + (hi-lo)/2
   192  		if oi[mid].Last >= ix {
   193  			hi = mid
   194  		} else {
   195  			lo = mid + 1
   196  		}
   197  	}
   198  	if hi == len(oi) {
   199  		return false
   200  	}
   201  	return oi[hi].First <= ix
   202  }
   203  
   204  func parseIndexesFromString(logger klog.Logger, indexesStr string, completions int) orderedIntervals {
   205  	if indexesStr == "" {
   206  		return nil
   207  	}
   208  	var result orderedIntervals
   209  	var lastInterval *interval
   210  	for _, intervalStr := range strings.Split(indexesStr, ",") {
   211  		limitsStr := strings.Split(intervalStr, "-")
   212  		var inter interval
   213  		var err error
   214  		inter.First, err = strconv.Atoi(limitsStr[0])
   215  		if err != nil {
   216  			logger.Info("Corrupted indexes interval, ignoring", "interval", intervalStr, "err", err)
   217  			continue
   218  		}
   219  		if inter.First >= completions {
   220  			break
   221  		}
   222  		if len(limitsStr) > 1 {
   223  			inter.Last, err = strconv.Atoi(limitsStr[1])
   224  			if err != nil {
   225  				logger.Info("Corrupted indexes interval, ignoring", "interval", intervalStr, "err", err)
   226  				continue
   227  			}
   228  			if inter.Last >= completions {
   229  				inter.Last = completions - 1
   230  			}
   231  		} else {
   232  			inter.Last = inter.First
   233  		}
   234  		if lastInterval != nil && lastInterval.Last == inter.First-1 {
   235  			lastInterval.Last = inter.Last
   236  		} else {
   237  			result = append(result, inter)
   238  			lastInterval = &result[len(result)-1]
   239  		}
   240  	}
   241  	return result
   242  }
   243  
   244  // firstPendingIndexes returns `count` indexes less than `completions` that are
   245  // not covered by `activePods`, `succeededIndexes` or `failedIndexes`.
   246  // In cases of PodReplacementPolicy as Failed we will include `terminatingPods` in this list.
   247  func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int {
   248  	if count == 0 {
   249  		return nil
   250  	}
   251  	active := getIndexes(jobCtx.activePods)
   252  	result := make([]int, 0, count)
   253  	nonPending := jobCtx.succeededIndexes.withOrderedIndexes(sets.List(active))
   254  	if onlyReplaceFailedPods(jobCtx.job) {
   255  		terminating := getIndexes(controller.FilterTerminatingPods(jobCtx.pods))
   256  		nonPending = nonPending.withOrderedIndexes(sets.List(terminating))
   257  	}
   258  	if jobCtx.failedIndexes != nil {
   259  		nonPending = nonPending.merge(*jobCtx.failedIndexes)
   260  	}
   261  	// The following algorithm is bounded by len(nonPending) and count.
   262  	candidate := 0
   263  	for _, sInterval := range nonPending {
   264  		for ; candidate < completions && len(result) < count && candidate < sInterval.First; candidate++ {
   265  			result = append(result, candidate)
   266  		}
   267  		if candidate < sInterval.Last+1 {
   268  			candidate = sInterval.Last + 1
   269  		}
   270  	}
   271  	for ; candidate < completions && len(result) < count; candidate++ {
   272  		result = append(result, candidate)
   273  	}
   274  	return result
   275  }
   276  
   277  // Returns the list of indexes corresponding to the set of pods
   278  func getIndexes(pods []*v1.Pod) sets.Set[int] {
   279  	result := sets.New[int]()
   280  	for _, p := range pods {
   281  		ix := getCompletionIndex(p.Annotations)
   282  		if ix != unknownCompletionIndex {
   283  			result.Insert(ix)
   284  		}
   285  	}
   286  	return result
   287  }
   288  
   289  // appendDuplicatedIndexPodsForRemoval scans active `pods` for duplicated
   290  // completion indexes. For each index, it selects n-1 pods for removal, where n
   291  // is the number of repetitions. The pods to be removed are appended to `rm`,
   292  // while the remaining pods are appended to `left`.
   293  // All pods that don't have a completion index are appended to `rm`.
   294  // All pods with index not in valid range are appended to `rm`.
   295  func appendDuplicatedIndexPodsForRemoval(rm, left, pods []*v1.Pod, completions int) ([]*v1.Pod, []*v1.Pod) {
   296  	sort.Sort(byCompletionIndex(pods))
   297  	lastIndex := unknownCompletionIndex
   298  	firstRepeatPos := 0
   299  	countLooped := 0
   300  	for i, p := range pods {
   301  		ix := getCompletionIndex(p.Annotations)
   302  		if ix >= completions {
   303  			rm = append(rm, pods[i:]...)
   304  			break
   305  		}
   306  		if ix != lastIndex {
   307  			rm, left = appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:i], lastIndex)
   308  			firstRepeatPos = i
   309  			lastIndex = ix
   310  		}
   311  		countLooped += 1
   312  	}
   313  	return appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:countLooped], lastIndex)
   314  }
   315  
   316  // getPodsWithDelayedDeletionPerIndex returns the pod which removal is delayed
   317  // in order to await for recreation. This map is used when BackoffLimitPerIndex
   318  // is enabled to delay pod finalizer removal, and thus pod deletion, until the
   319  // replacement pod is created. The pod deletion is delayed so that the
   320  // replacement pod can have the batch.kubernetes.io/job-index-failure-count
   321  // annotation set properly keeping track of the number of failed pods within
   322  // the index.
   323  func getPodsWithDelayedDeletionPerIndex(logger klog.Logger, jobCtx *syncJobCtx) map[int]*v1.Pod {
   324  	// the failed pods corresponding to currently active indexes can be safely
   325  	// deleted as the failure count annotation is present in the currently
   326  	// active pods.
   327  	activeIndexes := getIndexes(jobCtx.activePods)
   328  
   329  	podsWithDelayedDeletionPerIndex := make(map[int]*v1.Pod)
   330  	getValidPodsWithFilter(jobCtx, nil, func(p *v1.Pod) bool {
   331  		if isPodFailed(p, jobCtx.job) {
   332  			if ix := getCompletionIndex(p.Annotations); ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions) {
   333  				if jobCtx.succeededIndexes.has(ix) || jobCtx.failedIndexes.has(ix) || activeIndexes.Has(ix) {
   334  					return false
   335  				}
   336  				if lastPodWithDelayedDeletion, ok := podsWithDelayedDeletionPerIndex[ix]; ok {
   337  					if getIndexAbsoluteFailureCount(logger, lastPodWithDelayedDeletion) <= getIndexAbsoluteFailureCount(logger, p) && !getFinishedTime(p).Before(getFinishedTime(lastPodWithDelayedDeletion)) {
   338  						podsWithDelayedDeletionPerIndex[ix] = p
   339  					}
   340  				} else {
   341  					podsWithDelayedDeletionPerIndex[ix] = p
   342  				}
   343  			}
   344  		}
   345  		return false
   346  	})
   347  	return podsWithDelayedDeletionPerIndex
   348  }
   349  
   350  func addIndexFailureCountAnnotation(logger klog.Logger, template *v1.PodTemplateSpec, job *batch.Job, podBeingReplaced *v1.Pod) {
   351  	indexFailureCount, indexIgnoredFailureCount := getNewIndexFailureCounts(logger, job, podBeingReplaced)
   352  	template.Annotations[batch.JobIndexFailureCountAnnotation] = strconv.Itoa(int(indexFailureCount))
   353  	if indexIgnoredFailureCount > 0 {
   354  		template.Annotations[batch.JobIndexIgnoredFailureCountAnnotation] = strconv.Itoa(int(indexIgnoredFailureCount))
   355  	}
   356  }
   357  
   358  // getNewIndexFailureCount returns the value of the index-failure-count
   359  // annotation for the new pod being created
   360  func getNewIndexFailureCounts(logger klog.Logger, job *batch.Job, podBeingReplaced *v1.Pod) (int32, int32) {
   361  	if podBeingReplaced != nil {
   362  		indexFailureCount := parseIndexFailureCountAnnotation(logger, podBeingReplaced)
   363  		indexIgnoredFailureCount := parseIndexFailureIgnoreCountAnnotation(logger, podBeingReplaced)
   364  		if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
   365  			_, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, podBeingReplaced)
   366  			if countFailed {
   367  				indexFailureCount++
   368  			} else {
   369  				indexIgnoredFailureCount++
   370  			}
   371  		} else {
   372  			indexFailureCount++
   373  		}
   374  		return indexFailureCount, indexIgnoredFailureCount
   375  	}
   376  	return 0, 0
   377  }
   378  
   379  func appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods []*v1.Pod, ix int) ([]*v1.Pod, []*v1.Pod) {
   380  	if ix == unknownCompletionIndex {
   381  		rm = append(rm, pods...)
   382  		return rm, left
   383  	}
   384  	if len(pods) == 1 {
   385  		left = append(left, pods[0])
   386  		return rm, left
   387  	}
   388  	sort.Sort(controller.ActivePods(pods))
   389  	rm = append(rm, pods[:len(pods)-1]...)
   390  	left = append(left, pods[len(pods)-1])
   391  	return rm, left
   392  }
   393  
   394  func getCompletionIndex(annotations map[string]string) int {
   395  	if annotations == nil {
   396  		return unknownCompletionIndex
   397  	}
   398  	v, ok := annotations[batch.JobCompletionIndexAnnotation]
   399  	if !ok {
   400  		return unknownCompletionIndex
   401  	}
   402  	i, err := strconv.Atoi(v)
   403  	if err != nil {
   404  		return unknownCompletionIndex
   405  	}
   406  	if i < 0 {
   407  		return unknownCompletionIndex
   408  	}
   409  	return i
   410  }
   411  
   412  // getIndexFailureCount returns the value of the batch.kubernetes.io/job-index-failure-count
   413  // annotation as int32. It fallbacks to 0 when:
   414  //   - there is no annotation - for example the pod was created when the BackoffLimitPerIndex
   415  //     feature was temporarily disabled, or the annotation was manually removed by the user,
   416  //   - the value of the annotation isn't parsable as int - for example because
   417  //     it was set by a malicious user,
   418  //   - the value of the annotation is negative or greater by int32 - for example
   419  //     because it was set by a malicious user.
   420  func getIndexFailureCount(logger klog.Logger, pod *v1.Pod) int32 {
   421  	return parseIndexFailureCountAnnotation(logger, pod)
   422  }
   423  
   424  func getIndexAbsoluteFailureCount(logger klog.Logger, pod *v1.Pod) int32 {
   425  	return parseIndexFailureCountAnnotation(logger, pod) + parseIndexFailureIgnoreCountAnnotation(logger, pod)
   426  }
   427  
   428  func parseIndexFailureCountAnnotation(logger klog.Logger, pod *v1.Pod) int32 {
   429  	if value, ok := pod.Annotations[batch.JobIndexFailureCountAnnotation]; ok {
   430  		return parseInt32(logger, value)
   431  	}
   432  	logger.V(3).Info("There is no expected annotation", "annotationKey", batch.JobIndexFailureCountAnnotation, "pod", klog.KObj(pod), "podUID", pod.UID)
   433  	return 0
   434  }
   435  
   436  func parseIndexFailureIgnoreCountAnnotation(logger klog.Logger, pod *v1.Pod) int32 {
   437  	if value, ok := pod.Annotations[batch.JobIndexIgnoredFailureCountAnnotation]; ok {
   438  		return parseInt32(logger, value)
   439  	}
   440  	return 0
   441  }
   442  
   443  func parseInt32(logger klog.Logger, vStr string) int32 {
   444  	if vInt, err := strconv.Atoi(vStr); err != nil {
   445  		logger.Error(err, "Failed to parse the value", "value", vStr)
   446  		return 0
   447  	} else if vInt < 0 || vInt > math.MaxInt32 {
   448  		logger.Info("The value is invalid", "value", vInt)
   449  		return 0
   450  	} else {
   451  		return int32(vInt)
   452  	}
   453  }
   454  
   455  func addCompletionIndexEnvVariables(template *v1.PodTemplateSpec) {
   456  	for i := range template.Spec.InitContainers {
   457  		addCompletionIndexEnvVariable(&template.Spec.InitContainers[i])
   458  	}
   459  	for i := range template.Spec.Containers {
   460  		addCompletionIndexEnvVariable(&template.Spec.Containers[i])
   461  	}
   462  }
   463  
   464  func addCompletionIndexEnvVariable(container *v1.Container) {
   465  	for _, v := range container.Env {
   466  		if v.Name == completionIndexEnvName {
   467  			return
   468  		}
   469  	}
   470  	var fieldPath string
   471  	if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
   472  		fieldPath = fmt.Sprintf("metadata.labels['%s']", batch.JobCompletionIndexAnnotation)
   473  	} else {
   474  		fieldPath = fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation)
   475  	}
   476  	container.Env = append(container.Env, v1.EnvVar{
   477  		Name: completionIndexEnvName,
   478  		ValueFrom: &v1.EnvVarSource{
   479  			FieldRef: &v1.ObjectFieldSelector{
   480  				FieldPath: fieldPath,
   481  			},
   482  		},
   483  	})
   484  }
   485  
   486  func addCompletionIndexAnnotation(template *v1.PodTemplateSpec, index int) {
   487  	if template.Annotations == nil {
   488  		template.Annotations = make(map[string]string, 1)
   489  	}
   490  	template.Annotations[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index)
   491  }
   492  
   493  func addCompletionIndexLabel(template *v1.PodTemplateSpec, index int) {
   494  	if template.Labels == nil {
   495  		template.Labels = make(map[string]string, 1)
   496  	}
   497  	// For consistency, we use the annotation batch.kubernetes.io/job-completion-index for the corresponding label as well.
   498  	template.Labels[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index)
   499  }
   500  
   501  func podGenerateNameWithIndex(jobName string, index int) string {
   502  	appendIndex := "-" + strconv.Itoa(index) + "-"
   503  	generateNamePrefix := jobName + appendIndex
   504  	if len(generateNamePrefix) > names.MaxGeneratedNameLength {
   505  		generateNamePrefix = generateNamePrefix[:names.MaxGeneratedNameLength-len(appendIndex)] + appendIndex
   506  	}
   507  	return generateNamePrefix
   508  }
   509  
   510  type byCompletionIndex []*v1.Pod
   511  
   512  func (bci byCompletionIndex) Less(i, j int) bool {
   513  	return getCompletionIndex(bci[i].Annotations) < getCompletionIndex(bci[j].Annotations)
   514  }
   515  
   516  func (bci byCompletionIndex) Swap(i, j int) {
   517  	bci[i], bci[j] = bci[j], bci[i]
   518  }
   519  
   520  func (bci byCompletionIndex) Len() int {
   521  	return len(bci)
   522  }
   523  
   524  func completionModeStr(job *batch.Job) string {
   525  	if job.Spec.CompletionMode != nil {
   526  		return string(*job.Spec.CompletionMode)
   527  	}
   528  	return string(batch.NonIndexedCompletion)
   529  }
   530  

View as plain text