1
16
17 package cronjob
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "sort"
24 "strings"
25 "time"
26
27 "github.com/robfig/cron/v3"
28
29 batchv1 "k8s.io/api/batch/v1"
30 corev1 "k8s.io/api/core/v1"
31 "k8s.io/apimachinery/pkg/api/errors"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/labels"
34 "k8s.io/apimachinery/pkg/runtime"
35 "k8s.io/apimachinery/pkg/types"
36 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
37 "k8s.io/apimachinery/pkg/util/wait"
38 batchv1informers "k8s.io/client-go/informers/batch/v1"
39 clientset "k8s.io/client-go/kubernetes"
40 "k8s.io/client-go/kubernetes/scheme"
41 covev1client "k8s.io/client-go/kubernetes/typed/core/v1"
42 batchv1listers "k8s.io/client-go/listers/batch/v1"
43 "k8s.io/client-go/tools/cache"
44 "k8s.io/client-go/tools/record"
45 ref "k8s.io/client-go/tools/reference"
46 "k8s.io/client-go/util/workqueue"
47 "k8s.io/klog/v2"
48 "k8s.io/kubernetes/pkg/controller"
49 "k8s.io/kubernetes/pkg/controller/cronjob/metrics"
50 "k8s.io/utils/pointer"
51 )
52
53 var (
54
55 controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob")
56
57 nextScheduleDelta = 100 * time.Millisecond
58 )
59
60
61
62 type ControllerV2 struct {
63 queue workqueue.RateLimitingInterface
64
65 kubeClient clientset.Interface
66 recorder record.EventRecorder
67 broadcaster record.EventBroadcaster
68
69 jobControl jobControlInterface
70 cronJobControl cjControlInterface
71
72 jobLister batchv1listers.JobLister
73 cronJobLister batchv1listers.CronJobLister
74
75 jobListerSynced cache.InformerSynced
76 cronJobListerSynced cache.InformerSynced
77
78
79 now func() time.Time
80 }
81
82
83 func NewControllerV2(ctx context.Context, jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) {
84 logger := klog.FromContext(ctx)
85 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
86
87 jm := &ControllerV2{
88 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),
89 kubeClient: kubeClient,
90 broadcaster: eventBroadcaster,
91 recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}),
92
93 jobControl: realJobControl{KubeClient: kubeClient},
94 cronJobControl: &realCJControl{KubeClient: kubeClient},
95
96 jobLister: jobInformer.Lister(),
97 cronJobLister: cronJobsInformer.Lister(),
98
99 jobListerSynced: jobInformer.Informer().HasSynced,
100 cronJobListerSynced: cronJobsInformer.Informer().HasSynced,
101 now: time.Now,
102 }
103
104 jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
105 AddFunc: jm.addJob,
106 UpdateFunc: jm.updateJob,
107 DeleteFunc: jm.deleteJob,
108 })
109
110 cronJobsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
111 AddFunc: func(obj interface{}) {
112 jm.enqueueController(obj)
113 },
114 UpdateFunc: func(oldObj, newObj interface{}) {
115 jm.updateCronJob(logger, oldObj, newObj)
116 },
117 DeleteFunc: func(obj interface{}) {
118 jm.enqueueController(obj)
119 },
120 })
121
122 metrics.Register()
123
124 return jm, nil
125 }
126
127
128 func (jm *ControllerV2) Run(ctx context.Context, workers int) {
129 defer utilruntime.HandleCrash()
130
131
132 jm.broadcaster.StartStructuredLogging(3)
133 jm.broadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")})
134 defer jm.broadcaster.Shutdown()
135
136 defer jm.queue.ShutDown()
137
138 logger := klog.FromContext(ctx)
139 logger.Info("Starting cronjob controller v2")
140 defer logger.Info("Shutting down cronjob controller v2")
141
142 if !cache.WaitForNamedCacheSync("cronjob", ctx.Done(), jm.jobListerSynced, jm.cronJobListerSynced) {
143 return
144 }
145
146 for i := 0; i < workers; i++ {
147 go wait.UntilWithContext(ctx, jm.worker, time.Second)
148 }
149
150 <-ctx.Done()
151 }
152
153 func (jm *ControllerV2) worker(ctx context.Context) {
154 for jm.processNextWorkItem(ctx) {
155 }
156 }
157
158 func (jm *ControllerV2) processNextWorkItem(ctx context.Context) bool {
159 key, quit := jm.queue.Get()
160 if quit {
161 return false
162 }
163 defer jm.queue.Done(key)
164
165 requeueAfter, err := jm.sync(ctx, key.(string))
166 switch {
167 case err != nil:
168 utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %v", key.(string), err))
169 jm.queue.AddRateLimited(key)
170 case requeueAfter != nil:
171 jm.queue.Forget(key)
172 jm.queue.AddAfter(key, *requeueAfter)
173 }
174 return true
175 }
176
177 func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Duration, error) {
178 ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey)
179 if err != nil {
180 return nil, err
181 }
182 logger := klog.FromContext(ctx)
183 cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name)
184 switch {
185 case errors.IsNotFound(err):
186
187 logger.V(4).Info("CronJob not found, may be it is deleted", "cronjob", klog.KObj(cronJob), "err", err)
188 return nil, nil
189 case err != nil:
190
191 return nil, err
192 }
193
194 jobsToBeReconciled, err := jm.getJobsToBeReconciled(cronJob)
195 if err != nil {
196 return nil, err
197 }
198
199
200
201 cronJobCopy := cronJob.DeepCopy()
202
203 updateStatusAfterCleanup := jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled)
204
205 requeueAfter, updateStatusAfterSync, syncErr := jm.syncCronJob(ctx, cronJobCopy, jobsToBeReconciled)
206 if syncErr != nil {
207 logger.V(2).Info("Error reconciling cronjob", "cronjob", klog.KObj(cronJob), "err", syncErr)
208 }
209
210
211 if updateStatusAfterCleanup || updateStatusAfterSync {
212 if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
213 logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err)
214 return nil, err
215 }
216 }
217
218 if requeueAfter != nil {
219 logger.V(4).Info("Re-queuing cronjob", "cronjob", klog.KObj(cronJob), "requeueAfter", requeueAfter)
220 return requeueAfter, nil
221 }
222
223 return nil, syncErr
224 }
225
226
227
228
229 func (jm *ControllerV2) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batchv1.CronJob {
230
231
232 if controllerRef.Kind != controllerKind.Kind {
233 return nil
234 }
235 cronJob, err := jm.cronJobLister.CronJobs(namespace).Get(controllerRef.Name)
236 if err != nil {
237 return nil
238 }
239 if cronJob.UID != controllerRef.UID {
240
241
242 return nil
243 }
244 return cronJob
245 }
246
247 func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1.CronJob) ([]*batchv1.Job, error) {
248
249
250 jobList, err := jm.jobLister.Jobs(cronJob.Namespace).List(labels.Everything())
251 if err != nil {
252 return nil, err
253 }
254
255 jobsToBeReconciled := []*batchv1.Job{}
256
257 for _, job := range jobList {
258
259 if controllerRef := metav1.GetControllerOf(job); controllerRef != nil && controllerRef.Name == cronJob.Name {
260
261 jobsToBeReconciled = append(jobsToBeReconciled, job)
262 }
263 }
264 return jobsToBeReconciled, nil
265 }
266
267
268 func (jm *ControllerV2) addJob(obj interface{}) {
269 job := obj.(*batchv1.Job)
270 if job.DeletionTimestamp != nil {
271
272
273 jm.deleteJob(job)
274 return
275 }
276
277
278 if controllerRef := metav1.GetControllerOf(job); controllerRef != nil {
279 cronJob := jm.resolveControllerRef(job.Namespace, controllerRef)
280 if cronJob == nil {
281 return
282 }
283 jm.enqueueController(cronJob)
284 return
285 }
286 }
287
288
289
290
291
292 func (jm *ControllerV2) updateJob(old, cur interface{}) {
293 curJob := cur.(*batchv1.Job)
294 oldJob := old.(*batchv1.Job)
295 if curJob.ResourceVersion == oldJob.ResourceVersion {
296
297
298 return
299 }
300
301 curControllerRef := metav1.GetControllerOf(curJob)
302 oldControllerRef := metav1.GetControllerOf(oldJob)
303 controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
304 if controllerRefChanged && oldControllerRef != nil {
305
306 if cronJob := jm.resolveControllerRef(oldJob.Namespace, oldControllerRef); cronJob != nil {
307 jm.enqueueController(cronJob)
308 }
309 }
310
311
312 if curControllerRef != nil {
313 cronJob := jm.resolveControllerRef(curJob.Namespace, curControllerRef)
314 if cronJob == nil {
315 return
316 }
317 jm.enqueueController(cronJob)
318 return
319 }
320 }
321
322 func (jm *ControllerV2) deleteJob(obj interface{}) {
323 job, ok := obj.(*batchv1.Job)
324
325
326
327
328 if !ok {
329 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
330 if !ok {
331 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
332 return
333 }
334 job, ok = tombstone.Obj.(*batchv1.Job)
335 if !ok {
336 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Job %#v", obj))
337 return
338 }
339 }
340
341 controllerRef := metav1.GetControllerOf(job)
342 if controllerRef == nil {
343
344 return
345 }
346 cronJob := jm.resolveControllerRef(job.Namespace, controllerRef)
347 if cronJob == nil {
348 return
349 }
350 jm.enqueueController(cronJob)
351 }
352
353 func (jm *ControllerV2) enqueueController(obj interface{}) {
354 key, err := controller.KeyFunc(obj)
355 if err != nil {
356 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
357 return
358 }
359
360 jm.queue.Add(key)
361 }
362
363 func (jm *ControllerV2) enqueueControllerAfter(obj interface{}, t time.Duration) {
364 key, err := controller.KeyFunc(obj)
365 if err != nil {
366 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
367 return
368 }
369
370 jm.queue.AddAfter(key, t)
371 }
372
373
374
375 func (jm *ControllerV2) updateCronJob(logger klog.Logger, old interface{}, curr interface{}) {
376 oldCJ, okOld := old.(*batchv1.CronJob)
377 newCJ, okNew := curr.(*batchv1.CronJob)
378
379 if !okOld || !okNew {
380
381 return
382 }
383
384
385
386 if oldCJ.Spec.Schedule != newCJ.Spec.Schedule || !pointer.StringEqual(oldCJ.Spec.TimeZone, newCJ.Spec.TimeZone) {
387
388 sched, err := cron.ParseStandard(formatSchedule(newCJ, nil))
389 if err != nil {
390
391
392 logger.V(2).Info("Unparseable schedule for cronjob", "cronjob", klog.KObj(newCJ), "schedule", newCJ.Spec.Schedule, "err", err)
393 jm.recorder.Eventf(newCJ, corev1.EventTypeWarning, "UnParseableCronJobSchedule", "unparseable schedule for cronjob: %s", newCJ.Spec.Schedule)
394 return
395 }
396 now := jm.now()
397 t := nextScheduleTimeDuration(newCJ, now, sched)
398
399 jm.enqueueControllerAfter(curr, *t)
400 return
401 }
402
403
404
405
406
407
408 jm.enqueueController(curr)
409 }
410
411
412
413
414
415 func (jm *ControllerV2) syncCronJob(
416 ctx context.Context,
417 cronJob *batchv1.CronJob,
418 jobs []*batchv1.Job) (*time.Duration, bool, error) {
419
420 now := jm.now()
421 updateStatus := false
422
423 childrenJobs := make(map[types.UID]bool)
424 for _, j := range jobs {
425 childrenJobs[j.ObjectMeta.UID] = true
426 found := inActiveList(cronJob, j.ObjectMeta.UID)
427 if !found && !IsJobFinished(j) {
428 cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name)
429 if err != nil {
430 return nil, updateStatus, err
431 }
432 if inActiveList(cjCopy, j.ObjectMeta.UID) {
433 cronJob = cjCopy
434 continue
435 }
436 jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
437
438
439
440
441 } else if found && IsJobFinished(j) {
442 _, status := getFinishedStatus(j)
443 deleteFromActiveList(cronJob, j.ObjectMeta.UID)
444 jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
445 updateStatus = true
446 } else if IsJobSucceeded(j) {
447
448 if cronJob.Status.LastSuccessfulTime == nil {
449 cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
450 updateStatus = true
451 }
452 if j.Status.CompletionTime != nil && j.Status.CompletionTime.After(cronJob.Status.LastSuccessfulTime.Time) {
453 cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
454 updateStatus = true
455 }
456 }
457 }
458
459
460
461
462 for _, j := range cronJob.Status.Active {
463 _, found := childrenJobs[j.UID]
464 if found {
465 continue
466 }
467
468
469 _, err := jm.jobControl.GetJob(j.Namespace, j.Name)
470 switch {
471 case errors.IsNotFound(err):
472
473
474 jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
475 deleteFromActiveList(cronJob, j.UID)
476 updateStatus = true
477 case err != nil:
478 return nil, updateStatus, err
479 }
480
481 }
482
483 if cronJob.DeletionTimestamp != nil {
484
485
486 return nil, updateStatus, nil
487 }
488
489 logger := klog.FromContext(ctx)
490 if cronJob.Spec.TimeZone != nil {
491 timeZone := pointer.StringDeref(cronJob.Spec.TimeZone, "")
492 if _, err := time.LoadLocation(timeZone); err != nil {
493 logger.V(4).Info("Not starting job because timeZone is invalid", "cronjob", klog.KObj(cronJob), "timeZone", timeZone, "err", err)
494 jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err)
495 return nil, updateStatus, nil
496 }
497 }
498
499 if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
500 logger.V(4).Info("Not starting job because the cron is suspended", "cronjob", klog.KObj(cronJob))
501 return nil, updateStatus, nil
502 }
503
504 sched, err := cron.ParseStandard(formatSchedule(cronJob, jm.recorder))
505 if err != nil {
506
507
508 logger.V(2).Info("Unparseable schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
509 jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cronJob.Spec.Schedule, err)
510 return nil, updateStatus, nil
511 }
512
513 scheduledTime, err := nextScheduleTime(logger, cronJob, now, sched, jm.recorder)
514 if err != nil {
515
516
517 logger.V(2).Info("Invalid schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
518 jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err)
519 return nil, updateStatus, nil
520 }
521 if scheduledTime == nil {
522
523
524
525
526 logger.V(4).Info("No unmet start times", "cronjob", klog.KObj(cronJob))
527 t := nextScheduleTimeDuration(cronJob, now, sched)
528 return t, updateStatus, nil
529 }
530
531 tooLate := false
532 if cronJob.Spec.StartingDeadlineSeconds != nil {
533 tooLate = scheduledTime.Add(time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)).Before(now)
534 }
535 if tooLate {
536 logger.V(4).Info("Missed starting window", "cronjob", klog.KObj(cronJob))
537 jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z))
538
539
540
541
542
543
544
545
546 t := nextScheduleTimeDuration(cronJob, now, sched)
547 return t, updateStatus, nil
548 }
549 if inActiveListByName(cronJob, &batchv1.Job{
550 ObjectMeta: metav1.ObjectMeta{
551 Name: getJobName(cronJob, *scheduledTime),
552 Namespace: cronJob.Namespace,
553 }}) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {
554 logger.V(4).Info("Not starting job because the scheduled time is already processed", "cronjob", klog.KObj(cronJob), "schedule", scheduledTime)
555 t := nextScheduleTimeDuration(cronJob, now, sched)
556 return t, updateStatus, nil
557 }
558 if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
559
560
561
562
563
564
565
566
567
568 logger.V(4).Info("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KObj(cronJob))
569 jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
570 t := nextScheduleTimeDuration(cronJob, now, sched)
571 return t, updateStatus, nil
572 }
573 if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
574 for _, j := range cronJob.Status.Active {
575 logger.V(4).Info("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name))
576 job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
577 if err != nil {
578 jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
579 return nil, updateStatus, err
580 }
581 if !deleteJob(logger, cronJob, job, jm.jobControl, jm.recorder) {
582 return nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
583 }
584 updateStatus = true
585 }
586 }
587
588 jobAlreadyExists := false
589 jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
590 if err != nil {
591 logger.Error(err, "Unable to make Job from template", "cronjob", klog.KObj(cronJob))
592 return nil, updateStatus, err
593 }
594 jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq)
595 switch {
596 case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause):
597
598
599 return nil, updateStatus, err
600 case errors.IsAlreadyExists(err):
601
602
603
604
605 jobAlreadyExists = true
606 job, err := jm.jobControl.GetJob(jobReq.GetNamespace(), jobReq.GetName())
607 if err != nil {
608 return nil, updateStatus, err
609 }
610 jobResp = job
611
612
613
614 if !metav1.IsControlledBy(job, cronJob) {
615 return nil, updateStatus, nil
616 }
617
618
619 found := inActiveList(cronJob, job.ObjectMeta.UID)
620 if found {
621 return nil, updateStatus, nil
622 }
623 case err != nil:
624
625 jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
626 return nil, updateStatus, err
627 }
628
629 if jobAlreadyExists {
630 logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq))
631 } else {
632 metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
633 logger.V(4).Info("Created Job", "job", klog.KObj(jobResp), "cronjob", klog.KObj(cronJob))
634 jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
635 }
636
637
638
639
640
641
642
643
644
645
646
647
648 jobRef, err := getRef(jobResp)
649 if err != nil {
650 logger.V(2).Info("Unable to make object reference", "cronjob", klog.KObj(cronJob), "err", err)
651 return nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob))
652 }
653 cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
654 cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
655 updateStatus = true
656
657 t := nextScheduleTimeDuration(cronJob, now, sched)
658 return t, updateStatus, nil
659 }
660
661 func getJobName(cj *batchv1.CronJob, scheduledTime time.Time) string {
662 return fmt.Sprintf("%s-%d", cj.Name, getTimeHashInMinutes(scheduledTime))
663 }
664
665
666
667 func (jm *ControllerV2) cleanupFinishedJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job) bool {
668
669 if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
670 return false
671 }
672
673 updateStatus := false
674 failedJobs := []*batchv1.Job{}
675 successfulJobs := []*batchv1.Job{}
676
677 for _, job := range js {
678 isFinished, finishedStatus := jm.getFinishedStatus(job)
679 if isFinished && finishedStatus == batchv1.JobComplete {
680 successfulJobs = append(successfulJobs, job)
681 } else if isFinished && finishedStatus == batchv1.JobFailed {
682 failedJobs = append(failedJobs, job)
683 }
684 }
685
686 if cj.Spec.SuccessfulJobsHistoryLimit != nil &&
687 jm.removeOldestJobs(ctx, cj,
688 successfulJobs,
689 *cj.Spec.SuccessfulJobsHistoryLimit) {
690 updateStatus = true
691 }
692
693 if cj.Spec.FailedJobsHistoryLimit != nil &&
694 jm.removeOldestJobs(ctx, cj,
695 failedJobs,
696 *cj.Spec.FailedJobsHistoryLimit) {
697 updateStatus = true
698 }
699
700 return updateStatus
701 }
702
703 func (jm *ControllerV2) getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) {
704 for _, c := range j.Status.Conditions {
705 if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
706 return true, c.Type
707 }
708 }
709 return false, ""
710 }
711
712
713 func (jm *ControllerV2) removeOldestJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job, maxJobs int32) bool {
714 updateStatus := false
715 numToDelete := len(js) - int(maxJobs)
716 if numToDelete <= 0 {
717 return updateStatus
718 }
719 logger := klog.FromContext(ctx)
720 logger.V(4).Info("Cleaning up jobs from CronJob list", "deletejobnum", numToDelete, "jobnum", len(js), "cronjob", klog.KObj(cj))
721
722 sort.Sort(byJobStartTime(js))
723 for i := 0; i < numToDelete; i++ {
724 logger.V(4).Info("Removing job from CronJob list", "job", js[i].Name, "cronjob", klog.KObj(cj))
725 if deleteJob(logger, cj, js[i], jm.jobControl, jm.recorder) {
726 updateStatus = true
727 }
728 }
729 return updateStatus
730 }
731
732
733 func deleteJob(logger klog.Logger, cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
734
735 if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
736 recorder.Eventf(cj, corev1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
737 logger.Error(err, "Error deleting job from cronjob", "job", klog.KObj(job), "cronjob", klog.KObj(cj))
738 return false
739 }
740
741 deleteFromActiveList(cj, job.ObjectMeta.UID)
742 recorder.Eventf(cj, corev1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
743
744 return true
745 }
746
747 func getRef(object runtime.Object) (*corev1.ObjectReference, error) {
748 return ref.GetReference(scheme.Scheme, object)
749 }
750
751 func formatSchedule(cj *batchv1.CronJob, recorder record.EventRecorder) string {
752 if strings.Contains(cj.Spec.Schedule, "TZ") {
753 if recorder != nil {
754 recorder.Eventf(cj, corev1.EventTypeWarning, "UnsupportedSchedule", "CRON_TZ or TZ used in schedule %q is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", cj.Spec.Schedule)
755 }
756
757 return cj.Spec.Schedule
758 }
759
760 if cj.Spec.TimeZone != nil {
761 if _, err := time.LoadLocation(*cj.Spec.TimeZone); err != nil {
762 return cj.Spec.Schedule
763 }
764
765 return fmt.Sprintf("TZ=%s %s", *cj.Spec.TimeZone, cj.Spec.Schedule)
766 }
767
768 return cj.Spec.Schedule
769 }
770
View as plain text