...

Source file src/k8s.io/kubernetes/pkg/controller/job/backoff_utils.go

Documentation: k8s.io/kubernetes/pkg/controller/job

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package job
    18  
    19  import (
    20  	"fmt"
    21  	"sort"
    22  	"time"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	"k8s.io/client-go/tools/cache"
    26  	"k8s.io/klog/v2"
    27  	apipod "k8s.io/kubernetes/pkg/api/v1/pod"
    28  	"k8s.io/utils/clock"
    29  	"k8s.io/utils/ptr"
    30  )
    31  
    32  type backoffRecord struct {
    33  	key                      string
    34  	failuresAfterLastSuccess int32
    35  	lastFailureTime          *time.Time
    36  }
    37  
    38  type backoffStore struct {
    39  	store cache.Store
    40  }
    41  
    42  func (s *backoffStore) updateBackoffRecord(record backoffRecord) error {
    43  	b, ok, err := s.store.GetByKey(record.key)
    44  	if err != nil {
    45  		return err
    46  	}
    47  
    48  	if !ok {
    49  		err = s.store.Add(&record)
    50  		if err != nil {
    51  			return err
    52  		}
    53  	} else {
    54  		backoffRecord := b.(*backoffRecord)
    55  		backoffRecord.failuresAfterLastSuccess = record.failuresAfterLastSuccess
    56  		backoffRecord.lastFailureTime = record.lastFailureTime
    57  	}
    58  
    59  	return nil
    60  }
    61  
    62  func (s *backoffStore) removeBackoffRecord(jobId string) error {
    63  	b, ok, err := s.store.GetByKey(jobId)
    64  	if err != nil {
    65  		return err
    66  	}
    67  
    68  	if ok {
    69  		err = s.store.Delete(b)
    70  		if err != nil {
    71  			return err
    72  		}
    73  	}
    74  
    75  	return nil
    76  
    77  }
    78  
    79  func newBackoffStore() *backoffStore {
    80  	return &backoffStore{
    81  		store: cache.NewStore(backoffRecordKeyFunc),
    82  	}
    83  }
    84  
    85  var backoffRecordKeyFunc = func(obj interface{}) (string, error) {
    86  	if u, ok := obj.(*backoffRecord); ok {
    87  		return u.key, nil
    88  	}
    89  	return "", fmt.Errorf("could not find key for obj %#v", obj)
    90  }
    91  
    92  func (backoffRecordStore *backoffStore) newBackoffRecord(key string, newSucceededPods []*v1.Pod, newFailedPods []*v1.Pod) backoffRecord {
    93  	var backoff *backoffRecord
    94  
    95  	if b, exists, _ := backoffRecordStore.store.GetByKey(key); exists {
    96  		old := b.(*backoffRecord)
    97  		backoff = &backoffRecord{
    98  			key:                      old.key,
    99  			failuresAfterLastSuccess: old.failuresAfterLastSuccess,
   100  			lastFailureTime:          old.lastFailureTime,
   101  		}
   102  	} else {
   103  		backoff = &backoffRecord{
   104  			key:                      key,
   105  			failuresAfterLastSuccess: 0,
   106  			lastFailureTime:          nil,
   107  		}
   108  	}
   109  
   110  	sortByFinishedTime(newSucceededPods)
   111  	sortByFinishedTime(newFailedPods)
   112  
   113  	if len(newSucceededPods) == 0 {
   114  		if len(newFailedPods) == 0 {
   115  			return *backoff
   116  		}
   117  
   118  		backoff.failuresAfterLastSuccess = backoff.failuresAfterLastSuccess + int32(len(newFailedPods))
   119  		lastFailureTime := getFinishedTime(newFailedPods[len(newFailedPods)-1])
   120  		backoff.lastFailureTime = &lastFailureTime
   121  		return *backoff
   122  
   123  	} else {
   124  		if len(newFailedPods) == 0 {
   125  			backoff.failuresAfterLastSuccess = 0
   126  			backoff.lastFailureTime = nil
   127  			return *backoff
   128  		}
   129  
   130  		backoff.failuresAfterLastSuccess = 0
   131  		backoff.lastFailureTime = nil
   132  
   133  		lastSuccessTime := getFinishedTime(newSucceededPods[len(newSucceededPods)-1])
   134  		for i := len(newFailedPods) - 1; i >= 0; i-- {
   135  			failedTime := getFinishedTime(newFailedPods[i])
   136  			if !failedTime.After(lastSuccessTime) {
   137  				break
   138  			}
   139  			if backoff.lastFailureTime == nil {
   140  				backoff.lastFailureTime = &failedTime
   141  			}
   142  			backoff.failuresAfterLastSuccess += 1
   143  		}
   144  
   145  		return *backoff
   146  
   147  	}
   148  
   149  }
   150  
   151  func sortByFinishedTime(pods []*v1.Pod) {
   152  	sort.Slice(pods, func(i, j int) bool {
   153  		p1 := pods[i]
   154  		p2 := pods[j]
   155  		p1FinishTime := getFinishedTime(p1)
   156  		p2FinishTime := getFinishedTime(p2)
   157  
   158  		return p1FinishTime.Before(p2FinishTime)
   159  	})
   160  }
   161  
   162  // Returns the pod finish time using the following lookups:
   163  // 1. if all containers finished, use the latest time
   164  // 2. if the pod has Ready=False condition, use the last transition time
   165  // 3. if the pod has been deleted, use the `deletionTimestamp - grace_period` to estimate the moment of deletion
   166  // 4. fallback to pod's creation time
   167  //
   168  // Pods owned by Kubelet are marked with Ready=False condition when
   169  // transitioning to terminal phase, thus being handled by (1.) or (2.).
   170  // Orphaned pods are deleted by PodGC, thus being handled by (3.).
   171  func getFinishedTime(p *v1.Pod) time.Time {
   172  	if finishTime := getFinishTimeFromContainers(p); finishTime != nil {
   173  		return *finishTime
   174  	}
   175  	if finishTime := getFinishTimeFromPodReadyFalseCondition(p); finishTime != nil {
   176  		return *finishTime
   177  	}
   178  	if finishTime := getFinishTimeFromDeletionTimestamp(p); finishTime != nil {
   179  		return *finishTime
   180  	}
   181  	// This should not happen in clusters with Kubelet and PodGC running.
   182  	return p.CreationTimestamp.Time
   183  }
   184  
   185  func getFinishTimeFromContainers(p *v1.Pod) *time.Time {
   186  	var finishTime *time.Time
   187  	for _, containerState := range p.Status.ContainerStatuses {
   188  		if containerState.State.Terminated == nil {
   189  			return nil
   190  		}
   191  		if containerState.State.Terminated.FinishedAt.Time.IsZero() {
   192  			return nil
   193  		}
   194  		if finishTime == nil || finishTime.Before(containerState.State.Terminated.FinishedAt.Time) {
   195  			finishTime = &containerState.State.Terminated.FinishedAt.Time
   196  		}
   197  	}
   198  	return finishTime
   199  }
   200  
   201  func getFinishTimeFromPodReadyFalseCondition(p *v1.Pod) *time.Time {
   202  	if _, c := apipod.GetPodCondition(&p.Status, v1.PodReady); c != nil && c.Status == v1.ConditionFalse && !c.LastTransitionTime.Time.IsZero() {
   203  		return &c.LastTransitionTime.Time
   204  	}
   205  	return nil
   206  }
   207  
   208  func getFinishTimeFromDeletionTimestamp(p *v1.Pod) *time.Time {
   209  	if p.DeletionTimestamp != nil {
   210  		finishTime := p.DeletionTimestamp.Time.Add(-time.Duration(ptr.Deref(p.DeletionGracePeriodSeconds, 0)) * time.Second)
   211  		return &finishTime
   212  	}
   213  	return nil
   214  }
   215  
   216  func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration) time.Duration {
   217  	return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, backoff.failuresAfterLastSuccess, backoff.lastFailureTime)
   218  }
   219  
   220  // getRemainingTimePerIndex returns the remaining time left for a given index to
   221  // create the replacement pods. The number of consecutive pod failures for the
   222  // index is retrieved from the `job-index-failure-count` annotation of the
   223  // last failed pod within the index (represented by `lastFailedPod`).
   224  // The last failed pod is also used to determine the time of the last failure.
   225  func getRemainingTimePerIndex(logger klog.Logger, clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, lastFailedPod *v1.Pod) time.Duration {
   226  	if lastFailedPod == nil {
   227  		// There is no previous failed pod for this index
   228  		return time.Duration(0)
   229  	}
   230  	failureCount := getIndexAbsoluteFailureCount(logger, lastFailedPod) + 1
   231  	lastFailureTime := getFinishedTime(lastFailedPod)
   232  	return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, failureCount, &lastFailureTime)
   233  }
   234  
   235  func getRemainingTimeForFailuresCount(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, failuresCount int32, lastFailureTime *time.Time) time.Duration {
   236  	if failuresCount == 0 {
   237  		return 0
   238  	}
   239  
   240  	backoffDuration := defaultBackoff
   241  	for i := 1; i < int(failuresCount); i++ {
   242  		backoffDuration = backoffDuration * 2
   243  		if backoffDuration >= maxBackoff {
   244  			backoffDuration = maxBackoff
   245  			break
   246  		}
   247  	}
   248  
   249  	timeElapsedSinceLastFailure := clock.Since(*lastFailureTime)
   250  
   251  	if backoffDuration < timeElapsedSinceLastFailure {
   252  		return 0
   253  	}
   254  
   255  	return backoffDuration - timeElapsedSinceLastFailure
   256  }
   257  

View as plain text