1
16
17 package job
18
19 import (
20 "fmt"
21 "sort"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 "k8s.io/client-go/tools/cache"
26 "k8s.io/klog/v2"
27 apipod "k8s.io/kubernetes/pkg/api/v1/pod"
28 "k8s.io/utils/clock"
29 "k8s.io/utils/ptr"
30 )
31
32 type backoffRecord struct {
33 key string
34 failuresAfterLastSuccess int32
35 lastFailureTime *time.Time
36 }
37
38 type backoffStore struct {
39 store cache.Store
40 }
41
42 func (s *backoffStore) updateBackoffRecord(record backoffRecord) error {
43 b, ok, err := s.store.GetByKey(record.key)
44 if err != nil {
45 return err
46 }
47
48 if !ok {
49 err = s.store.Add(&record)
50 if err != nil {
51 return err
52 }
53 } else {
54 backoffRecord := b.(*backoffRecord)
55 backoffRecord.failuresAfterLastSuccess = record.failuresAfterLastSuccess
56 backoffRecord.lastFailureTime = record.lastFailureTime
57 }
58
59 return nil
60 }
61
62 func (s *backoffStore) removeBackoffRecord(jobId string) error {
63 b, ok, err := s.store.GetByKey(jobId)
64 if err != nil {
65 return err
66 }
67
68 if ok {
69 err = s.store.Delete(b)
70 if err != nil {
71 return err
72 }
73 }
74
75 return nil
76
77 }
78
79 func newBackoffStore() *backoffStore {
80 return &backoffStore{
81 store: cache.NewStore(backoffRecordKeyFunc),
82 }
83 }
84
85 var backoffRecordKeyFunc = func(obj interface{}) (string, error) {
86 if u, ok := obj.(*backoffRecord); ok {
87 return u.key, nil
88 }
89 return "", fmt.Errorf("could not find key for obj %#v", obj)
90 }
91
92 func (backoffRecordStore *backoffStore) newBackoffRecord(key string, newSucceededPods []*v1.Pod, newFailedPods []*v1.Pod) backoffRecord {
93 var backoff *backoffRecord
94
95 if b, exists, _ := backoffRecordStore.store.GetByKey(key); exists {
96 old := b.(*backoffRecord)
97 backoff = &backoffRecord{
98 key: old.key,
99 failuresAfterLastSuccess: old.failuresAfterLastSuccess,
100 lastFailureTime: old.lastFailureTime,
101 }
102 } else {
103 backoff = &backoffRecord{
104 key: key,
105 failuresAfterLastSuccess: 0,
106 lastFailureTime: nil,
107 }
108 }
109
110 sortByFinishedTime(newSucceededPods)
111 sortByFinishedTime(newFailedPods)
112
113 if len(newSucceededPods) == 0 {
114 if len(newFailedPods) == 0 {
115 return *backoff
116 }
117
118 backoff.failuresAfterLastSuccess = backoff.failuresAfterLastSuccess + int32(len(newFailedPods))
119 lastFailureTime := getFinishedTime(newFailedPods[len(newFailedPods)-1])
120 backoff.lastFailureTime = &lastFailureTime
121 return *backoff
122
123 } else {
124 if len(newFailedPods) == 0 {
125 backoff.failuresAfterLastSuccess = 0
126 backoff.lastFailureTime = nil
127 return *backoff
128 }
129
130 backoff.failuresAfterLastSuccess = 0
131 backoff.lastFailureTime = nil
132
133 lastSuccessTime := getFinishedTime(newSucceededPods[len(newSucceededPods)-1])
134 for i := len(newFailedPods) - 1; i >= 0; i-- {
135 failedTime := getFinishedTime(newFailedPods[i])
136 if !failedTime.After(lastSuccessTime) {
137 break
138 }
139 if backoff.lastFailureTime == nil {
140 backoff.lastFailureTime = &failedTime
141 }
142 backoff.failuresAfterLastSuccess += 1
143 }
144
145 return *backoff
146
147 }
148
149 }
150
151 func sortByFinishedTime(pods []*v1.Pod) {
152 sort.Slice(pods, func(i, j int) bool {
153 p1 := pods[i]
154 p2 := pods[j]
155 p1FinishTime := getFinishedTime(p1)
156 p2FinishTime := getFinishedTime(p2)
157
158 return p1FinishTime.Before(p2FinishTime)
159 })
160 }
161
162
163
164
165
166
167
168
169
170
171 func getFinishedTime(p *v1.Pod) time.Time {
172 if finishTime := getFinishTimeFromContainers(p); finishTime != nil {
173 return *finishTime
174 }
175 if finishTime := getFinishTimeFromPodReadyFalseCondition(p); finishTime != nil {
176 return *finishTime
177 }
178 if finishTime := getFinishTimeFromDeletionTimestamp(p); finishTime != nil {
179 return *finishTime
180 }
181
182 return p.CreationTimestamp.Time
183 }
184
185 func getFinishTimeFromContainers(p *v1.Pod) *time.Time {
186 var finishTime *time.Time
187 for _, containerState := range p.Status.ContainerStatuses {
188 if containerState.State.Terminated == nil {
189 return nil
190 }
191 if containerState.State.Terminated.FinishedAt.Time.IsZero() {
192 return nil
193 }
194 if finishTime == nil || finishTime.Before(containerState.State.Terminated.FinishedAt.Time) {
195 finishTime = &containerState.State.Terminated.FinishedAt.Time
196 }
197 }
198 return finishTime
199 }
200
201 func getFinishTimeFromPodReadyFalseCondition(p *v1.Pod) *time.Time {
202 if _, c := apipod.GetPodCondition(&p.Status, v1.PodReady); c != nil && c.Status == v1.ConditionFalse && !c.LastTransitionTime.Time.IsZero() {
203 return &c.LastTransitionTime.Time
204 }
205 return nil
206 }
207
208 func getFinishTimeFromDeletionTimestamp(p *v1.Pod) *time.Time {
209 if p.DeletionTimestamp != nil {
210 finishTime := p.DeletionTimestamp.Time.Add(-time.Duration(ptr.Deref(p.DeletionGracePeriodSeconds, 0)) * time.Second)
211 return &finishTime
212 }
213 return nil
214 }
215
216 func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration) time.Duration {
217 return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, backoff.failuresAfterLastSuccess, backoff.lastFailureTime)
218 }
219
220
221
222
223
224
225 func getRemainingTimePerIndex(logger klog.Logger, clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, lastFailedPod *v1.Pod) time.Duration {
226 if lastFailedPod == nil {
227
228 return time.Duration(0)
229 }
230 failureCount := getIndexAbsoluteFailureCount(logger, lastFailedPod) + 1
231 lastFailureTime := getFinishedTime(lastFailedPod)
232 return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, failureCount, &lastFailureTime)
233 }
234
235 func getRemainingTimeForFailuresCount(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, failuresCount int32, lastFailureTime *time.Time) time.Duration {
236 if failuresCount == 0 {
237 return 0
238 }
239
240 backoffDuration := defaultBackoff
241 for i := 1; i < int(failuresCount); i++ {
242 backoffDuration = backoffDuration * 2
243 if backoffDuration >= maxBackoff {
244 backoffDuration = maxBackoff
245 break
246 }
247 }
248
249 timeElapsedSinceLastFailure := clock.Since(*lastFailureTime)
250
251 if backoffDuration < timeElapsedSinceLastFailure {
252 return 0
253 }
254
255 return backoffDuration - timeElapsedSinceLastFailure
256 }
257
View as plain text