...

Source file src/k8s.io/kubernetes/pkg/controller/cronjob/cronjob_controllerv2.go

Documentation: k8s.io/kubernetes/pkg/controller/cronjob

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cronjob
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"sort"
    24  	"strings"
    25  	"time"
    26  
    27  	"github.com/robfig/cron/v3"
    28  
    29  	batchv1 "k8s.io/api/batch/v1"
    30  	corev1 "k8s.io/api/core/v1"
    31  	"k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/labels"
    34  	"k8s.io/apimachinery/pkg/runtime"
    35  	"k8s.io/apimachinery/pkg/types"
    36  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    37  	"k8s.io/apimachinery/pkg/util/wait"
    38  	batchv1informers "k8s.io/client-go/informers/batch/v1"
    39  	clientset "k8s.io/client-go/kubernetes"
    40  	"k8s.io/client-go/kubernetes/scheme"
    41  	covev1client "k8s.io/client-go/kubernetes/typed/core/v1"
    42  	batchv1listers "k8s.io/client-go/listers/batch/v1"
    43  	"k8s.io/client-go/tools/cache"
    44  	"k8s.io/client-go/tools/record"
    45  	ref "k8s.io/client-go/tools/reference"
    46  	"k8s.io/client-go/util/workqueue"
    47  	"k8s.io/klog/v2"
    48  	"k8s.io/kubernetes/pkg/controller"
    49  	"k8s.io/kubernetes/pkg/controller/cronjob/metrics"
    50  	"k8s.io/utils/pointer"
    51  )
    52  
    53  var (
    54  	// controllerKind contains the schema.GroupVersionKind for this controller type.
    55  	controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob")
    56  
    57  	nextScheduleDelta = 100 * time.Millisecond
    58  )
    59  
    60  // ControllerV2 is a controller for CronJobs.
    61  // Refactored Cronjob controller that uses DelayingQueue and informers
    62  type ControllerV2 struct {
    63  	queue workqueue.RateLimitingInterface
    64  
    65  	kubeClient  clientset.Interface
    66  	recorder    record.EventRecorder
    67  	broadcaster record.EventBroadcaster
    68  
    69  	jobControl     jobControlInterface
    70  	cronJobControl cjControlInterface
    71  
    72  	jobLister     batchv1listers.JobLister
    73  	cronJobLister batchv1listers.CronJobLister
    74  
    75  	jobListerSynced     cache.InformerSynced
    76  	cronJobListerSynced cache.InformerSynced
    77  
    78  	// now is a function that returns current time, done to facilitate unit tests
    79  	now func() time.Time
    80  }
    81  
    82  // NewControllerV2 creates and initializes a new Controller.
    83  func NewControllerV2(ctx context.Context, jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) {
    84  	logger := klog.FromContext(ctx)
    85  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
    86  
    87  	jm := &ControllerV2{
    88  		queue:       workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),
    89  		kubeClient:  kubeClient,
    90  		broadcaster: eventBroadcaster,
    91  		recorder:    eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}),
    92  
    93  		jobControl:     realJobControl{KubeClient: kubeClient},
    94  		cronJobControl: &realCJControl{KubeClient: kubeClient},
    95  
    96  		jobLister:     jobInformer.Lister(),
    97  		cronJobLister: cronJobsInformer.Lister(),
    98  
    99  		jobListerSynced:     jobInformer.Informer().HasSynced,
   100  		cronJobListerSynced: cronJobsInformer.Informer().HasSynced,
   101  		now:                 time.Now,
   102  	}
   103  
   104  	jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   105  		AddFunc:    jm.addJob,
   106  		UpdateFunc: jm.updateJob,
   107  		DeleteFunc: jm.deleteJob,
   108  	})
   109  
   110  	cronJobsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   111  		AddFunc: func(obj interface{}) {
   112  			jm.enqueueController(obj)
   113  		},
   114  		UpdateFunc: func(oldObj, newObj interface{}) {
   115  			jm.updateCronJob(logger, oldObj, newObj)
   116  		},
   117  		DeleteFunc: func(obj interface{}) {
   118  			jm.enqueueController(obj)
   119  		},
   120  	})
   121  
   122  	metrics.Register()
   123  
   124  	return jm, nil
   125  }
   126  
   127  // Run starts the main goroutine responsible for watching and syncing jobs.
   128  func (jm *ControllerV2) Run(ctx context.Context, workers int) {
   129  	defer utilruntime.HandleCrash()
   130  
   131  	// Start event processing pipeline.
   132  	jm.broadcaster.StartStructuredLogging(3)
   133  	jm.broadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")})
   134  	defer jm.broadcaster.Shutdown()
   135  
   136  	defer jm.queue.ShutDown()
   137  
   138  	logger := klog.FromContext(ctx)
   139  	logger.Info("Starting cronjob controller v2")
   140  	defer logger.Info("Shutting down cronjob controller v2")
   141  
   142  	if !cache.WaitForNamedCacheSync("cronjob", ctx.Done(), jm.jobListerSynced, jm.cronJobListerSynced) {
   143  		return
   144  	}
   145  
   146  	for i := 0; i < workers; i++ {
   147  		go wait.UntilWithContext(ctx, jm.worker, time.Second)
   148  	}
   149  
   150  	<-ctx.Done()
   151  }
   152  
   153  func (jm *ControllerV2) worker(ctx context.Context) {
   154  	for jm.processNextWorkItem(ctx) {
   155  	}
   156  }
   157  
   158  func (jm *ControllerV2) processNextWorkItem(ctx context.Context) bool {
   159  	key, quit := jm.queue.Get()
   160  	if quit {
   161  		return false
   162  	}
   163  	defer jm.queue.Done(key)
   164  
   165  	requeueAfter, err := jm.sync(ctx, key.(string))
   166  	switch {
   167  	case err != nil:
   168  		utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %v", key.(string), err))
   169  		jm.queue.AddRateLimited(key)
   170  	case requeueAfter != nil:
   171  		jm.queue.Forget(key)
   172  		jm.queue.AddAfter(key, *requeueAfter)
   173  	}
   174  	return true
   175  }
   176  
   177  func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Duration, error) {
   178  	ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey)
   179  	if err != nil {
   180  		return nil, err
   181  	}
   182  	logger := klog.FromContext(ctx)
   183  	cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name)
   184  	switch {
   185  	case errors.IsNotFound(err):
   186  		// may be cronjob is deleted, don't need to requeue this key
   187  		logger.V(4).Info("CronJob not found, may be it is deleted", "cronjob", klog.KObj(cronJob), "err", err)
   188  		return nil, nil
   189  	case err != nil:
   190  		// for other transient apiserver error requeue with exponential backoff
   191  		return nil, err
   192  	}
   193  
   194  	jobsToBeReconciled, err := jm.getJobsToBeReconciled(cronJob)
   195  	if err != nil {
   196  		return nil, err
   197  	}
   198  
   199  	// cronJobCopy is used to combine all the updates to a
   200  	// CronJob object and perform an actual update only once.
   201  	cronJobCopy := cronJob.DeepCopy()
   202  
   203  	updateStatusAfterCleanup := jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled)
   204  
   205  	requeueAfter, updateStatusAfterSync, syncErr := jm.syncCronJob(ctx, cronJobCopy, jobsToBeReconciled)
   206  	if syncErr != nil {
   207  		logger.V(2).Info("Error reconciling cronjob", "cronjob", klog.KObj(cronJob), "err", syncErr)
   208  	}
   209  
   210  	// Update the CronJob if needed
   211  	if updateStatusAfterCleanup || updateStatusAfterSync {
   212  		if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
   213  			logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err)
   214  			return nil, err
   215  		}
   216  	}
   217  
   218  	if requeueAfter != nil {
   219  		logger.V(4).Info("Re-queuing cronjob", "cronjob", klog.KObj(cronJob), "requeueAfter", requeueAfter)
   220  		return requeueAfter, nil
   221  	}
   222  	// this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format
   223  	return nil, syncErr
   224  }
   225  
   226  // resolveControllerRef returns the controller referenced by a ControllerRef,
   227  // or nil if the ControllerRef could not be resolved to a matching controller
   228  // of the correct Kind.
   229  func (jm *ControllerV2) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batchv1.CronJob {
   230  	// We can't look up by UID, so look up by Name and then verify UID.
   231  	// Don't even try to look up by Name if it's the wrong Kind.
   232  	if controllerRef.Kind != controllerKind.Kind {
   233  		return nil
   234  	}
   235  	cronJob, err := jm.cronJobLister.CronJobs(namespace).Get(controllerRef.Name)
   236  	if err != nil {
   237  		return nil
   238  	}
   239  	if cronJob.UID != controllerRef.UID {
   240  		// The controller we found with this Name is not the same one that the
   241  		// ControllerRef points to.
   242  		return nil
   243  	}
   244  	return cronJob
   245  }
   246  
   247  func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1.CronJob) ([]*batchv1.Job, error) {
   248  	// list all jobs: there may be jobs with labels that don't match the template anymore,
   249  	// but that still have a ControllerRef to the given cronjob
   250  	jobList, err := jm.jobLister.Jobs(cronJob.Namespace).List(labels.Everything())
   251  	if err != nil {
   252  		return nil, err
   253  	}
   254  
   255  	jobsToBeReconciled := []*batchv1.Job{}
   256  
   257  	for _, job := range jobList {
   258  		// If it has a ControllerRef, that's all that matters.
   259  		if controllerRef := metav1.GetControllerOf(job); controllerRef != nil && controllerRef.Name == cronJob.Name {
   260  			// this job is needs to be reconciled
   261  			jobsToBeReconciled = append(jobsToBeReconciled, job)
   262  		}
   263  	}
   264  	return jobsToBeReconciled, nil
   265  }
   266  
   267  // When a job is created, enqueue the controller that manages it and update it's expectations.
   268  func (jm *ControllerV2) addJob(obj interface{}) {
   269  	job := obj.(*batchv1.Job)
   270  	if job.DeletionTimestamp != nil {
   271  		// on a restart of the controller, it's possible a new job shows up in a state that
   272  		// is already pending deletion. Prevent the job from being a creation observation.
   273  		jm.deleteJob(job)
   274  		return
   275  	}
   276  
   277  	// If it has a ControllerRef, that's all that matters.
   278  	if controllerRef := metav1.GetControllerOf(job); controllerRef != nil {
   279  		cronJob := jm.resolveControllerRef(job.Namespace, controllerRef)
   280  		if cronJob == nil {
   281  			return
   282  		}
   283  		jm.enqueueController(cronJob)
   284  		return
   285  	}
   286  }
   287  
   288  // updateJob figures out what CronJob(s) manage a Job when the Job
   289  // is updated and wake them up. If the anything of the Job have changed, we need to
   290  // awaken both the old and new CronJob. old and cur must be *batchv1.Job
   291  // types.
   292  func (jm *ControllerV2) updateJob(old, cur interface{}) {
   293  	curJob := cur.(*batchv1.Job)
   294  	oldJob := old.(*batchv1.Job)
   295  	if curJob.ResourceVersion == oldJob.ResourceVersion {
   296  		// Periodic resync will send update events for all known jobs.
   297  		// Two different versions of the same jobs will always have different RVs.
   298  		return
   299  	}
   300  
   301  	curControllerRef := metav1.GetControllerOf(curJob)
   302  	oldControllerRef := metav1.GetControllerOf(oldJob)
   303  	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
   304  	if controllerRefChanged && oldControllerRef != nil {
   305  		// The ControllerRef was changed. Sync the old controller, if any.
   306  		if cronJob := jm.resolveControllerRef(oldJob.Namespace, oldControllerRef); cronJob != nil {
   307  			jm.enqueueController(cronJob)
   308  		}
   309  	}
   310  
   311  	// If it has a ControllerRef, that's all that matters.
   312  	if curControllerRef != nil {
   313  		cronJob := jm.resolveControllerRef(curJob.Namespace, curControllerRef)
   314  		if cronJob == nil {
   315  			return
   316  		}
   317  		jm.enqueueController(cronJob)
   318  		return
   319  	}
   320  }
   321  
   322  func (jm *ControllerV2) deleteJob(obj interface{}) {
   323  	job, ok := obj.(*batchv1.Job)
   324  
   325  	// When a delete is dropped, the relist will notice a job in the store not
   326  	// in the list, leading to the insertion of a tombstone object which contains
   327  	// the deleted key/value. Note that this value might be stale.
   328  	if !ok {
   329  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   330  		if !ok {
   331  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
   332  			return
   333  		}
   334  		job, ok = tombstone.Obj.(*batchv1.Job)
   335  		if !ok {
   336  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Job %#v", obj))
   337  			return
   338  		}
   339  	}
   340  
   341  	controllerRef := metav1.GetControllerOf(job)
   342  	if controllerRef == nil {
   343  		// No controller should care about orphans being deleted.
   344  		return
   345  	}
   346  	cronJob := jm.resolveControllerRef(job.Namespace, controllerRef)
   347  	if cronJob == nil {
   348  		return
   349  	}
   350  	jm.enqueueController(cronJob)
   351  }
   352  
   353  func (jm *ControllerV2) enqueueController(obj interface{}) {
   354  	key, err := controller.KeyFunc(obj)
   355  	if err != nil {
   356  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
   357  		return
   358  	}
   359  
   360  	jm.queue.Add(key)
   361  }
   362  
   363  func (jm *ControllerV2) enqueueControllerAfter(obj interface{}, t time.Duration) {
   364  	key, err := controller.KeyFunc(obj)
   365  	if err != nil {
   366  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
   367  		return
   368  	}
   369  
   370  	jm.queue.AddAfter(key, t)
   371  }
   372  
   373  // updateCronJob re-queues the CronJob for next scheduled time if there is a
   374  // change in spec.schedule otherwise it re-queues it now
   375  func (jm *ControllerV2) updateCronJob(logger klog.Logger, old interface{}, curr interface{}) {
   376  	oldCJ, okOld := old.(*batchv1.CronJob)
   377  	newCJ, okNew := curr.(*batchv1.CronJob)
   378  
   379  	if !okOld || !okNew {
   380  		// typecasting of one failed, handle this better, may be log entry
   381  		return
   382  	}
   383  	// if the change in schedule results in next requeue having to be sooner than it already was,
   384  	// it will be handled here by the queue. If the next requeue is further than previous schedule,
   385  	// the sync loop will essentially be a no-op for the already queued key with old schedule.
   386  	if oldCJ.Spec.Schedule != newCJ.Spec.Schedule || !pointer.StringEqual(oldCJ.Spec.TimeZone, newCJ.Spec.TimeZone) {
   387  		// schedule changed, change the requeue time, pass nil recorder so that syncCronJob will output any warnings
   388  		sched, err := cron.ParseStandard(formatSchedule(newCJ, nil))
   389  		if err != nil {
   390  			// this is likely a user error in defining the spec value
   391  			// we should log the error and not reconcile this cronjob until an update to spec
   392  			logger.V(2).Info("Unparseable schedule for cronjob", "cronjob", klog.KObj(newCJ), "schedule", newCJ.Spec.Schedule, "err", err)
   393  			jm.recorder.Eventf(newCJ, corev1.EventTypeWarning, "UnParseableCronJobSchedule", "unparseable schedule for cronjob: %s", newCJ.Spec.Schedule)
   394  			return
   395  		}
   396  		now := jm.now()
   397  		t := nextScheduleTimeDuration(newCJ, now, sched)
   398  
   399  		jm.enqueueControllerAfter(curr, *t)
   400  		return
   401  	}
   402  
   403  	// other parameters changed, requeue this now and if this gets triggered
   404  	// within deadline, sync loop will work on the CJ otherwise updates will be handled
   405  	// during the next schedule
   406  	// TODO: need to handle the change of spec.JobTemplate.metadata.labels explicitly
   407  	//   to cleanup jobs with old labels
   408  	jm.enqueueController(curr)
   409  }
   410  
   411  // syncCronJob reconciles a CronJob with a list of any Jobs that it created.
   412  // All known jobs created by "cronJob" should be included in "jobs".
   413  // The current time is passed in to facilitate testing.
   414  // It returns a bool to indicate an update to api-server is needed
   415  func (jm *ControllerV2) syncCronJob(
   416  	ctx context.Context,
   417  	cronJob *batchv1.CronJob,
   418  	jobs []*batchv1.Job) (*time.Duration, bool, error) {
   419  
   420  	now := jm.now()
   421  	updateStatus := false
   422  
   423  	childrenJobs := make(map[types.UID]bool)
   424  	for _, j := range jobs {
   425  		childrenJobs[j.ObjectMeta.UID] = true
   426  		found := inActiveList(cronJob, j.ObjectMeta.UID)
   427  		if !found && !IsJobFinished(j) {
   428  			cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name)
   429  			if err != nil {
   430  				return nil, updateStatus, err
   431  			}
   432  			if inActiveList(cjCopy, j.ObjectMeta.UID) {
   433  				cronJob = cjCopy
   434  				continue
   435  			}
   436  			jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
   437  			// We found an unfinished job that has us as the parent, but it is not in our Active list.
   438  			// This could happen if we crashed right after creating the Job and before updating the status,
   439  			// or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
   440  			// a job that they wanted us to adopt.
   441  		} else if found && IsJobFinished(j) {
   442  			_, status := getFinishedStatus(j)
   443  			deleteFromActiveList(cronJob, j.ObjectMeta.UID)
   444  			jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
   445  			updateStatus = true
   446  		} else if IsJobSucceeded(j) {
   447  			// a job does not have to be in active list, as long as it has completed successfully, we will process the timestamp
   448  			if cronJob.Status.LastSuccessfulTime == nil {
   449  				cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
   450  				updateStatus = true
   451  			}
   452  			if j.Status.CompletionTime != nil && j.Status.CompletionTime.After(cronJob.Status.LastSuccessfulTime.Time) {
   453  				cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
   454  				updateStatus = true
   455  			}
   456  		}
   457  	}
   458  
   459  	// Remove any job reference from the active list if the corresponding job does not exist any more.
   460  	// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
   461  	// job running.
   462  	for _, j := range cronJob.Status.Active {
   463  		_, found := childrenJobs[j.UID]
   464  		if found {
   465  			continue
   466  		}
   467  		// Explicitly try to get the job from api-server to avoid a slow watch not able to update
   468  		// the job lister on time, giving an unwanted miss
   469  		_, err := jm.jobControl.GetJob(j.Namespace, j.Name)
   470  		switch {
   471  		case errors.IsNotFound(err):
   472  			// The job is actually missing, delete from active list and schedule a new one if within
   473  			// deadline
   474  			jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
   475  			deleteFromActiveList(cronJob, j.UID)
   476  			updateStatus = true
   477  		case err != nil:
   478  			return nil, updateStatus, err
   479  		}
   480  		// the job is missing in the lister but found in api-server
   481  	}
   482  
   483  	if cronJob.DeletionTimestamp != nil {
   484  		// The CronJob is being deleted.
   485  		// Don't do anything other than updating status.
   486  		return nil, updateStatus, nil
   487  	}
   488  
   489  	logger := klog.FromContext(ctx)
   490  	if cronJob.Spec.TimeZone != nil {
   491  		timeZone := pointer.StringDeref(cronJob.Spec.TimeZone, "")
   492  		if _, err := time.LoadLocation(timeZone); err != nil {
   493  			logger.V(4).Info("Not starting job because timeZone is invalid", "cronjob", klog.KObj(cronJob), "timeZone", timeZone, "err", err)
   494  			jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err)
   495  			return nil, updateStatus, nil
   496  		}
   497  	}
   498  
   499  	if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
   500  		logger.V(4).Info("Not starting job because the cron is suspended", "cronjob", klog.KObj(cronJob))
   501  		return nil, updateStatus, nil
   502  	}
   503  
   504  	sched, err := cron.ParseStandard(formatSchedule(cronJob, jm.recorder))
   505  	if err != nil {
   506  		// this is likely a user error in defining the spec value
   507  		// we should log the error and not reconcile this cronjob until an update to spec
   508  		logger.V(2).Info("Unparseable schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
   509  		jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cronJob.Spec.Schedule, err)
   510  		return nil, updateStatus, nil
   511  	}
   512  
   513  	scheduledTime, err := nextScheduleTime(logger, cronJob, now, sched, jm.recorder)
   514  	if err != nil {
   515  		// this is likely a user error in defining the spec value
   516  		// we should log the error and not reconcile this cronjob until an update to spec
   517  		logger.V(2).Info("Invalid schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
   518  		jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err)
   519  		return nil, updateStatus, nil
   520  	}
   521  	if scheduledTime == nil {
   522  		// no unmet start time, return cj,.
   523  		// The only time this should happen is if queue is filled after restart.
   524  		// Otherwise, the queue is always suppose to trigger sync function at the time of
   525  		// the scheduled time, that will give atleast 1 unmet time schedule
   526  		logger.V(4).Info("No unmet start times", "cronjob", klog.KObj(cronJob))
   527  		t := nextScheduleTimeDuration(cronJob, now, sched)
   528  		return t, updateStatus, nil
   529  	}
   530  
   531  	tooLate := false
   532  	if cronJob.Spec.StartingDeadlineSeconds != nil {
   533  		tooLate = scheduledTime.Add(time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)).Before(now)
   534  	}
   535  	if tooLate {
   536  		logger.V(4).Info("Missed starting window", "cronjob", klog.KObj(cronJob))
   537  		jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z))
   538  
   539  		// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
   540  		// the miss every cycle.  In order to avoid sending multiple events, and to avoid processing
   541  		// the cj again and again, we could set a Status.LastMissedTime when we notice a miss.
   542  		// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
   543  		// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
   544  		// and event the next time we process it, and also so the user looking at the status
   545  		// can see easily that there was a missed execution.
   546  		t := nextScheduleTimeDuration(cronJob, now, sched)
   547  		return t, updateStatus, nil
   548  	}
   549  	if inActiveListByName(cronJob, &batchv1.Job{
   550  		ObjectMeta: metav1.ObjectMeta{
   551  			Name:      getJobName(cronJob, *scheduledTime),
   552  			Namespace: cronJob.Namespace,
   553  		}}) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {
   554  		logger.V(4).Info("Not starting job because the scheduled time is already processed", "cronjob", klog.KObj(cronJob), "schedule", scheduledTime)
   555  		t := nextScheduleTimeDuration(cronJob, now, sched)
   556  		return t, updateStatus, nil
   557  	}
   558  	if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
   559  		// Regardless which source of information we use for the set of active jobs,
   560  		// there is some risk that we won't see an active job when there is one.
   561  		// (because we haven't seen the status update to the SJ or the created pod).
   562  		// So it is theoretically possible to have concurrency with Forbid.
   563  		// As long the as the invocations are "far enough apart in time", this usually won't happen.
   564  		//
   565  		// TODO: for Forbid, we could use the same name for every execution, as a lock.
   566  		// With replace, we could use a name that is deterministic per execution time.
   567  		// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
   568  		logger.V(4).Info("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KObj(cronJob))
   569  		jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
   570  		t := nextScheduleTimeDuration(cronJob, now, sched)
   571  		return t, updateStatus, nil
   572  	}
   573  	if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
   574  		for _, j := range cronJob.Status.Active {
   575  			logger.V(4).Info("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name))
   576  			job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
   577  			if err != nil {
   578  				jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
   579  				return nil, updateStatus, err
   580  			}
   581  			if !deleteJob(logger, cronJob, job, jm.jobControl, jm.recorder) {
   582  				return nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
   583  			}
   584  			updateStatus = true
   585  		}
   586  	}
   587  
   588  	jobAlreadyExists := false
   589  	jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
   590  	if err != nil {
   591  		logger.Error(err, "Unable to make Job from template", "cronjob", klog.KObj(cronJob))
   592  		return nil, updateStatus, err
   593  	}
   594  	jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq)
   595  	switch {
   596  	case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause):
   597  		// if the namespace is being terminated, we don't have to do
   598  		// anything because any creation will fail
   599  		return nil, updateStatus, err
   600  	case errors.IsAlreadyExists(err):
   601  		// If the job is created by other actor, assume it has updated the cronjob status accordingly.
   602  		// However, if the job was created by cronjob controller, this means we've previously created the job
   603  		// but failed to update the active list in the status, in which case we should reattempt to add the job
   604  		// into the active list and update the status.
   605  		jobAlreadyExists = true
   606  		job, err := jm.jobControl.GetJob(jobReq.GetNamespace(), jobReq.GetName())
   607  		if err != nil {
   608  			return nil, updateStatus, err
   609  		}
   610  		jobResp = job
   611  
   612  		// check that this job is owned by cronjob controller, otherwise do nothing and assume external controller
   613  		// is updating the status.
   614  		if !metav1.IsControlledBy(job, cronJob) {
   615  			return nil, updateStatus, nil
   616  		}
   617  
   618  		// Recheck if the job is missing from the active list before attempting to update the status again.
   619  		found := inActiveList(cronJob, job.ObjectMeta.UID)
   620  		if found {
   621  			return nil, updateStatus, nil
   622  		}
   623  	case err != nil:
   624  		// default error handling
   625  		jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
   626  		return nil, updateStatus, err
   627  	}
   628  
   629  	if jobAlreadyExists {
   630  		logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq))
   631  	} else {
   632  		metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
   633  		logger.V(4).Info("Created Job", "job", klog.KObj(jobResp), "cronjob", klog.KObj(cronJob))
   634  		jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
   635  	}
   636  
   637  	// ------------------------------------------------------------------ //
   638  
   639  	// If this process restarts at this point (after posting a job, but
   640  	// before updating the status), then we might try to start the job on
   641  	// the next time.  Actually, if we re-list the SJs and Jobs on the next
   642  	// iteration of syncAll, we might not see our own status update, and
   643  	// then post one again.  So, we need to use the job name as a lock to
   644  	// prevent us from making the job twice (name the job with hash of its
   645  	// scheduled time).
   646  
   647  	// Add the just-started job to the status list.
   648  	jobRef, err := getRef(jobResp)
   649  	if err != nil {
   650  		logger.V(2).Info("Unable to make object reference", "cronjob", klog.KObj(cronJob), "err", err)
   651  		return nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob))
   652  	}
   653  	cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
   654  	cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
   655  	updateStatus = true
   656  
   657  	t := nextScheduleTimeDuration(cronJob, now, sched)
   658  	return t, updateStatus, nil
   659  }
   660  
   661  func getJobName(cj *batchv1.CronJob, scheduledTime time.Time) string {
   662  	return fmt.Sprintf("%s-%d", cj.Name, getTimeHashInMinutes(scheduledTime))
   663  }
   664  
   665  // cleanupFinishedJobs cleanups finished jobs created by a CronJob
   666  // It returns a bool to indicate an update to api-server is needed
   667  func (jm *ControllerV2) cleanupFinishedJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job) bool {
   668  	// If neither limits are active, there is no need to do anything.
   669  	if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
   670  		return false
   671  	}
   672  
   673  	updateStatus := false
   674  	failedJobs := []*batchv1.Job{}
   675  	successfulJobs := []*batchv1.Job{}
   676  
   677  	for _, job := range js {
   678  		isFinished, finishedStatus := jm.getFinishedStatus(job)
   679  		if isFinished && finishedStatus == batchv1.JobComplete {
   680  			successfulJobs = append(successfulJobs, job)
   681  		} else if isFinished && finishedStatus == batchv1.JobFailed {
   682  			failedJobs = append(failedJobs, job)
   683  		}
   684  	}
   685  
   686  	if cj.Spec.SuccessfulJobsHistoryLimit != nil &&
   687  		jm.removeOldestJobs(ctx, cj,
   688  			successfulJobs,
   689  			*cj.Spec.SuccessfulJobsHistoryLimit) {
   690  		updateStatus = true
   691  	}
   692  
   693  	if cj.Spec.FailedJobsHistoryLimit != nil &&
   694  		jm.removeOldestJobs(ctx, cj,
   695  			failedJobs,
   696  			*cj.Spec.FailedJobsHistoryLimit) {
   697  		updateStatus = true
   698  	}
   699  
   700  	return updateStatus
   701  }
   702  
   703  func (jm *ControllerV2) getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) {
   704  	for _, c := range j.Status.Conditions {
   705  		if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
   706  			return true, c.Type
   707  		}
   708  	}
   709  	return false, ""
   710  }
   711  
   712  // removeOldestJobs removes the oldest jobs from a list of jobs
   713  func (jm *ControllerV2) removeOldestJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job, maxJobs int32) bool {
   714  	updateStatus := false
   715  	numToDelete := len(js) - int(maxJobs)
   716  	if numToDelete <= 0 {
   717  		return updateStatus
   718  	}
   719  	logger := klog.FromContext(ctx)
   720  	logger.V(4).Info("Cleaning up jobs from CronJob list", "deletejobnum", numToDelete, "jobnum", len(js), "cronjob", klog.KObj(cj))
   721  
   722  	sort.Sort(byJobStartTime(js))
   723  	for i := 0; i < numToDelete; i++ {
   724  		logger.V(4).Info("Removing job from CronJob list", "job", js[i].Name, "cronjob", klog.KObj(cj))
   725  		if deleteJob(logger, cj, js[i], jm.jobControl, jm.recorder) {
   726  			updateStatus = true
   727  		}
   728  	}
   729  	return updateStatus
   730  }
   731  
   732  // deleteJob reaps a job, deleting the job, the pods and the reference in the active list
   733  func deleteJob(logger klog.Logger, cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
   734  	// delete the job itself...
   735  	if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
   736  		recorder.Eventf(cj, corev1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
   737  		logger.Error(err, "Error deleting job from cronjob", "job", klog.KObj(job), "cronjob", klog.KObj(cj))
   738  		return false
   739  	}
   740  	// ... and its reference from active list
   741  	deleteFromActiveList(cj, job.ObjectMeta.UID)
   742  	recorder.Eventf(cj, corev1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
   743  
   744  	return true
   745  }
   746  
   747  func getRef(object runtime.Object) (*corev1.ObjectReference, error) {
   748  	return ref.GetReference(scheme.Scheme, object)
   749  }
   750  
   751  func formatSchedule(cj *batchv1.CronJob, recorder record.EventRecorder) string {
   752  	if strings.Contains(cj.Spec.Schedule, "TZ") {
   753  		if recorder != nil {
   754  			recorder.Eventf(cj, corev1.EventTypeWarning, "UnsupportedSchedule", "CRON_TZ or TZ used in schedule %q is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", cj.Spec.Schedule)
   755  		}
   756  
   757  		return cj.Spec.Schedule
   758  	}
   759  
   760  	if cj.Spec.TimeZone != nil {
   761  		if _, err := time.LoadLocation(*cj.Spec.TimeZone); err != nil {
   762  			return cj.Spec.Schedule
   763  		}
   764  
   765  		return fmt.Sprintf("TZ=%s %s", *cj.Spec.TimeZone, cj.Spec.Schedule)
   766  	}
   767  
   768  	return cj.Spec.Schedule
   769  }
   770  

View as plain text