1
16
17 package cronjob
18
19 import (
20 "fmt"
21 "time"
22
23 "github.com/robfig/cron/v3"
24 "k8s.io/utils/pointer"
25
26 batchv1 "k8s.io/api/batch/v1"
27 corev1 "k8s.io/api/core/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/labels"
30 "k8s.io/apimachinery/pkg/types"
31 utilfeature "k8s.io/apiserver/pkg/util/feature"
32 "k8s.io/client-go/tools/record"
33 "k8s.io/klog/v2"
34 "k8s.io/kubernetes/pkg/features"
35 )
36
37
38
39 type missedSchedulesType int
40
41 const (
42 noneMissed missedSchedulesType = iota
43 fewMissed
44 manyMissed
45 )
46
47 func (e missedSchedulesType) String() string {
48 switch e {
49 case noneMissed:
50 return "none"
51 case fewMissed:
52 return "few"
53 case manyMissed:
54 return "many"
55 default:
56 return fmt.Sprintf("unknown(%d)", int(e))
57 }
58 }
59
60
61 func inActiveList(cj *batchv1.CronJob, uid types.UID) bool {
62 for _, j := range cj.Status.Active {
63 if j.UID == uid {
64 return true
65 }
66 }
67 return false
68 }
69
70
71
72 func inActiveListByName(cj *batchv1.CronJob, job *batchv1.Job) bool {
73 for _, j := range cj.Status.Active {
74 if j.Name == job.Name && j.Namespace == job.Namespace {
75 return true
76 }
77 }
78 return false
79 }
80
81 func deleteFromActiveList(cj *batchv1.CronJob, uid types.UID) {
82 if cj == nil {
83 return
84 }
85
86
87 newActive := []corev1.ObjectReference{}
88 for _, j := range cj.Status.Active {
89 if j.UID != uid {
90 newActive = append(newActive, j)
91 }
92 }
93 cj.Status.Active = newActive
94 }
95
96
97
98
99
100
101
102 func mostRecentScheduleTime(cj *batchv1.CronJob, now time.Time, schedule cron.Schedule, includeStartingDeadlineSeconds bool) (time.Time, *time.Time, missedSchedulesType, error) {
103 earliestTime := cj.ObjectMeta.CreationTimestamp.Time
104 missedSchedules := noneMissed
105 if cj.Status.LastScheduleTime != nil {
106 earliestTime = cj.Status.LastScheduleTime.Time
107 }
108 if includeStartingDeadlineSeconds && cj.Spec.StartingDeadlineSeconds != nil {
109
110 schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds))
111
112 if schedulingDeadline.After(earliestTime) {
113 earliestTime = schedulingDeadline
114 }
115 }
116
117 t1 := schedule.Next(earliestTime)
118 t2 := schedule.Next(t1)
119
120 if now.Before(t1) {
121 return earliestTime, nil, missedSchedules, nil
122 }
123 if now.Before(t2) {
124 return earliestTime, &t1, missedSchedules, nil
125 }
126
127
128
129
130 timeBetweenTwoSchedules := int64(t2.Sub(t1).Round(time.Second).Seconds())
131 if timeBetweenTwoSchedules < 1 {
132 return earliestTime, nil, missedSchedules, fmt.Errorf("time difference between two schedules is less than 1 second")
133 }
134
135
136
137 timeElapsed := int64(now.Sub(t1).Seconds())
138 numberOfMissedSchedules := (timeElapsed / timeBetweenTwoSchedules) + 1
139
140 var mostRecentTime time.Time
141
142
143
144
145
146
147
148
149 potentialEarliest := t1.Add(time.Duration((numberOfMissedSchedules-1-1)*timeBetweenTwoSchedules) * time.Second)
150 for t := schedule.Next(potentialEarliest); !t.After(now); t = schedule.Next(t) {
151 mostRecentTime = t
152 }
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 switch {
172 case numberOfMissedSchedules > 100:
173 missedSchedules = manyMissed
174
175 case numberOfMissedSchedules > 0:
176 missedSchedules = fewMissed
177 }
178
179 if mostRecentTime.IsZero() {
180 return earliestTime, nil, missedSchedules, nil
181 }
182 return earliestTime, &mostRecentTime, missedSchedules, nil
183 }
184
185
186
187
188
189
190 func nextScheduleTimeDuration(cj *batchv1.CronJob, now time.Time, schedule cron.Schedule) *time.Duration {
191 earliestTime, mostRecentTime, missedSchedules, err := mostRecentScheduleTime(cj, now, schedule, false)
192 if err != nil {
193
194 mostRecentTime = &now
195 } else if mostRecentTime == nil {
196 if missedSchedules == noneMissed {
197
198 mostRecentTime = &earliestTime
199 } else {
200
201 mostRecentTime = &now
202 }
203 }
204
205 t := schedule.Next(*mostRecentTime).Add(nextScheduleDelta).Sub(now)
206 return &t
207 }
208
209
210
211
212 func nextScheduleTime(logger klog.Logger, cj *batchv1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) {
213 _, mostRecentTime, missedSchedules, err := mostRecentScheduleTime(cj, now, schedule, true)
214
215 if mostRecentTime == nil || mostRecentTime.After(now) {
216 return nil, err
217 }
218
219 if missedSchedules == manyMissed {
220 recorder.Eventf(cj, corev1.EventTypeWarning, "TooManyMissedTimes", "too many missed start times. Set or decrease .spec.startingDeadlineSeconds or check clock skew")
221 logger.Info("too many missed times", "cronjob", klog.KObj(cj))
222 }
223
224 return mostRecentTime, err
225 }
226
227 func copyLabels(template *batchv1.JobTemplateSpec) labels.Set {
228 l := make(labels.Set)
229 for k, v := range template.Labels {
230 l[k] = v
231 }
232 return l
233 }
234
235 func copyAnnotations(template *batchv1.JobTemplateSpec) labels.Set {
236 a := make(labels.Set)
237 for k, v := range template.Annotations {
238 a[k] = v
239 }
240 return a
241 }
242
243
244
245
246 func getJobFromTemplate2(cj *batchv1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) {
247 labels := copyLabels(&cj.Spec.JobTemplate)
248 annotations := copyAnnotations(&cj.Spec.JobTemplate)
249
250 name := getJobName(cj, scheduledTime)
251
252 if utilfeature.DefaultFeatureGate.Enabled(features.CronJobsScheduledAnnotation) {
253
254 timeZoneLocation, err := time.LoadLocation(pointer.StringDeref(cj.Spec.TimeZone, ""))
255 if err != nil {
256 return nil, err
257 }
258
259 annotations[batchv1.CronJobScheduledTimestampAnnotation] = scheduledTime.In(timeZoneLocation).Format(time.RFC3339)
260 }
261
262 job := &batchv1.Job{
263 ObjectMeta: metav1.ObjectMeta{
264 Labels: labels,
265 Annotations: annotations,
266 Name: name,
267 CreationTimestamp: metav1.Time{Time: scheduledTime},
268 OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(cj, controllerKind)},
269 },
270 }
271 cj.Spec.JobTemplate.Spec.DeepCopyInto(&job.Spec)
272 return job, nil
273 }
274
275
276 func getTimeHashInMinutes(scheduledTime time.Time) int64 {
277 return scheduledTime.Unix() / 60
278 }
279
280 func getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) {
281 for _, c := range j.Status.Conditions {
282 if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
283 return true, c.Type
284 }
285 }
286 return false, ""
287 }
288
289
290 func IsJobFinished(j *batchv1.Job) bool {
291 isFinished, _ := getFinishedStatus(j)
292 return isFinished
293 }
294
295
296 func IsJobSucceeded(j *batchv1.Job) bool {
297 for _, c := range j.Status.Conditions {
298 if c.Type == batchv1.JobComplete && c.Status == corev1.ConditionTrue {
299 return true
300 }
301 }
302 return false
303 }
304
305
306 type byJobStartTime []*batchv1.Job
307
308 func (o byJobStartTime) Len() int { return len(o) }
309 func (o byJobStartTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
310
311 func (o byJobStartTime) Less(i, j int) bool {
312 if o[i].Status.StartTime == nil && o[j].Status.StartTime != nil {
313 return false
314 }
315 if o[i].Status.StartTime != nil && o[j].Status.StartTime == nil {
316 return true
317 }
318 if o[i].Status.StartTime.Equal(o[j].Status.StartTime) {
319 return o[i].Name < o[j].Name
320 }
321 return o[i].Status.StartTime.Before(o[j].Status.StartTime)
322 }
323
View as plain text