...

Source file src/k8s.io/kubernetes/pkg/controller/cronjob/utils.go

Documentation: k8s.io/kubernetes/pkg/controller/cronjob

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cronjob
    18  
    19  import (
    20  	"fmt"
    21  	"time"
    22  
    23  	"github.com/robfig/cron/v3"
    24  	"k8s.io/utils/pointer"
    25  
    26  	batchv1 "k8s.io/api/batch/v1"
    27  	corev1 "k8s.io/api/core/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/labels"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    32  	"k8s.io/client-go/tools/record"
    33  	"k8s.io/klog/v2"
    34  	"k8s.io/kubernetes/pkg/features"
    35  )
    36  
    37  // Utilities for dealing with Jobs and CronJobs and time.
    38  
    39  type missedSchedulesType int
    40  
    41  const (
    42  	noneMissed missedSchedulesType = iota
    43  	fewMissed
    44  	manyMissed
    45  )
    46  
    47  func (e missedSchedulesType) String() string {
    48  	switch e {
    49  	case noneMissed:
    50  		return "none"
    51  	case fewMissed:
    52  		return "few"
    53  	case manyMissed:
    54  		return "many"
    55  	default:
    56  		return fmt.Sprintf("unknown(%d)", int(e))
    57  	}
    58  }
    59  
    60  // inActiveList checks if cronjob's .status.active has a job with the same UID.
    61  func inActiveList(cj *batchv1.CronJob, uid types.UID) bool {
    62  	for _, j := range cj.Status.Active {
    63  		if j.UID == uid {
    64  			return true
    65  		}
    66  	}
    67  	return false
    68  }
    69  
    70  // inActiveListByName checks if cronjob's status.active has a job with the same
    71  // name and namespace.
    72  func inActiveListByName(cj *batchv1.CronJob, job *batchv1.Job) bool {
    73  	for _, j := range cj.Status.Active {
    74  		if j.Name == job.Name && j.Namespace == job.Namespace {
    75  			return true
    76  		}
    77  	}
    78  	return false
    79  }
    80  
    81  func deleteFromActiveList(cj *batchv1.CronJob, uid types.UID) {
    82  	if cj == nil {
    83  		return
    84  	}
    85  	// TODO: @alpatel the memory footprint can may be reduced here by
    86  	//  cj.Status.Active = append(cj.Status.Active[:indexToRemove], cj.Status.Active[indexToRemove:]...)
    87  	newActive := []corev1.ObjectReference{}
    88  	for _, j := range cj.Status.Active {
    89  		if j.UID != uid {
    90  			newActive = append(newActive, j)
    91  		}
    92  	}
    93  	cj.Status.Active = newActive
    94  }
    95  
    96  // mostRecentScheduleTime returns:
    97  //   - the last schedule time or CronJob's creation time,
    98  //   - the most recent time a Job should be created or nil, if that's after now,
    99  //   - value indicating either none missed schedules, a few missed or many missed
   100  //   - error in an edge case where the schedule specification is grammatically correct,
   101  //     but logically doesn't make sense (31st day for months with only 30 days, for example).
   102  func mostRecentScheduleTime(cj *batchv1.CronJob, now time.Time, schedule cron.Schedule, includeStartingDeadlineSeconds bool) (time.Time, *time.Time, missedSchedulesType, error) {
   103  	earliestTime := cj.ObjectMeta.CreationTimestamp.Time
   104  	missedSchedules := noneMissed
   105  	if cj.Status.LastScheduleTime != nil {
   106  		earliestTime = cj.Status.LastScheduleTime.Time
   107  	}
   108  	if includeStartingDeadlineSeconds && cj.Spec.StartingDeadlineSeconds != nil {
   109  		// controller is not going to schedule anything below this point
   110  		schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds))
   111  
   112  		if schedulingDeadline.After(earliestTime) {
   113  			earliestTime = schedulingDeadline
   114  		}
   115  	}
   116  
   117  	t1 := schedule.Next(earliestTime)
   118  	t2 := schedule.Next(t1)
   119  
   120  	if now.Before(t1) {
   121  		return earliestTime, nil, missedSchedules, nil
   122  	}
   123  	if now.Before(t2) {
   124  		return earliestTime, &t1, missedSchedules, nil
   125  	}
   126  
   127  	// It is possible for cron.ParseStandard("59 23 31 2 *") to return an invalid schedule
   128  	// minute - 59, hour - 23, dom - 31, month - 2, and dow is optional, clearly 31 is invalid
   129  	// In this case the timeBetweenTwoSchedules will be 0, and we error out the invalid schedule
   130  	timeBetweenTwoSchedules := int64(t2.Sub(t1).Round(time.Second).Seconds())
   131  	if timeBetweenTwoSchedules < 1 {
   132  		return earliestTime, nil, missedSchedules, fmt.Errorf("time difference between two schedules is less than 1 second")
   133  	}
   134  	// this logic used for calculating number of missed schedules does a rough
   135  	// approximation, by calculating a diff between two schedules (t1 and t2),
   136  	// and counting how many of these will fit in between last schedule and now
   137  	timeElapsed := int64(now.Sub(t1).Seconds())
   138  	numberOfMissedSchedules := (timeElapsed / timeBetweenTwoSchedules) + 1
   139  
   140  	var mostRecentTime time.Time
   141  	// to get the most recent time accurate for regular schedules and the ones
   142  	// specified with @every form, we first need to calculate the potential earliest
   143  	// time by multiplying the initial number of missed schedules by its interval,
   144  	// this is critical to ensure @every starts at the correct time, this explains
   145  	// the numberOfMissedSchedules-1, the additional -1 serves there to go back
   146  	// in time one more time unit, and let the cron library calculate a proper
   147  	// schedule, for case where the schedule is not consistent, for example
   148  	// something like  30 6-16/4 * * 1-5
   149  	potentialEarliest := t1.Add(time.Duration((numberOfMissedSchedules-1-1)*timeBetweenTwoSchedules) * time.Second)
   150  	for t := schedule.Next(potentialEarliest); !t.After(now); t = schedule.Next(t) {
   151  		mostRecentTime = t
   152  	}
   153  
   154  	// An object might miss several starts. For example, if
   155  	// controller gets wedged on friday at 5:01pm when everyone has
   156  	// gone home, and someone comes in on tuesday AM and discovers
   157  	// the problem and restarts the controller, then all the hourly
   158  	// jobs, more than 80 of them for one hourly cronJob, should
   159  	// all start running with no further intervention (if the cronJob
   160  	// allows concurrency and late starts).
   161  	//
   162  	// However, if there is a bug somewhere, or incorrect clock
   163  	// on controller's server or apiservers (for setting creationTimestamp)
   164  	// then there could be so many missed start times (it could be off
   165  	// by decades or more), that it would eat up all the CPU and memory
   166  	// of this controller. In that case, we want to not try to list
   167  	// all the missed start times.
   168  	//
   169  	// I've somewhat arbitrarily picked 100, as more than 80,
   170  	// but less than "lots".
   171  	switch {
   172  	case numberOfMissedSchedules > 100:
   173  		missedSchedules = manyMissed
   174  	// inform about few missed, still
   175  	case numberOfMissedSchedules > 0:
   176  		missedSchedules = fewMissed
   177  	}
   178  
   179  	if mostRecentTime.IsZero() {
   180  		return earliestTime, nil, missedSchedules, nil
   181  	}
   182  	return earliestTime, &mostRecentTime, missedSchedules, nil
   183  }
   184  
   185  // nextScheduleTimeDuration returns the time duration to requeue based on
   186  // the schedule and last schedule time. It adds a 100ms padding to the next requeue to account
   187  // for Network Time Protocol(NTP) time skews. If the time drifts the adjustment, which in most
   188  // realistic cases should be around 100s, the job will still be executed without missing
   189  // the schedule.
   190  func nextScheduleTimeDuration(cj *batchv1.CronJob, now time.Time, schedule cron.Schedule) *time.Duration {
   191  	earliestTime, mostRecentTime, missedSchedules, err := mostRecentScheduleTime(cj, now, schedule, false)
   192  	if err != nil {
   193  		// we still have to requeue at some point, so aim for the next scheduling slot from now
   194  		mostRecentTime = &now
   195  	} else if mostRecentTime == nil {
   196  		if missedSchedules == noneMissed {
   197  			// no missed schedules since earliestTime
   198  			mostRecentTime = &earliestTime
   199  		} else {
   200  			// if there are missed schedules since earliestTime, always use now
   201  			mostRecentTime = &now
   202  		}
   203  	}
   204  
   205  	t := schedule.Next(*mostRecentTime).Add(nextScheduleDelta).Sub(now)
   206  	return &t
   207  }
   208  
   209  // nextScheduleTime returns the time.Time of the next schedule after the last scheduled
   210  // and before now, or nil if no unmet schedule times, and an error.
   211  // If there are too many (>100) unstarted times, it will also record a warning.
   212  func nextScheduleTime(logger klog.Logger, cj *batchv1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) {
   213  	_, mostRecentTime, missedSchedules, err := mostRecentScheduleTime(cj, now, schedule, true)
   214  
   215  	if mostRecentTime == nil || mostRecentTime.After(now) {
   216  		return nil, err
   217  	}
   218  
   219  	if missedSchedules == manyMissed {
   220  		recorder.Eventf(cj, corev1.EventTypeWarning, "TooManyMissedTimes", "too many missed start times. Set or decrease .spec.startingDeadlineSeconds or check clock skew")
   221  		logger.Info("too many missed times", "cronjob", klog.KObj(cj))
   222  	}
   223  
   224  	return mostRecentTime, err
   225  }
   226  
   227  func copyLabels(template *batchv1.JobTemplateSpec) labels.Set {
   228  	l := make(labels.Set)
   229  	for k, v := range template.Labels {
   230  		l[k] = v
   231  	}
   232  	return l
   233  }
   234  
   235  func copyAnnotations(template *batchv1.JobTemplateSpec) labels.Set {
   236  	a := make(labels.Set)
   237  	for k, v := range template.Annotations {
   238  		a[k] = v
   239  	}
   240  	return a
   241  }
   242  
   243  // getJobFromTemplate2 makes a Job from a CronJob. It converts the unix time into minutes from
   244  // epoch time and concatenates that to the job name, because the cronjob_controller v2 has the lowest
   245  // granularity of 1 minute for scheduling job.
   246  func getJobFromTemplate2(cj *batchv1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) {
   247  	labels := copyLabels(&cj.Spec.JobTemplate)
   248  	annotations := copyAnnotations(&cj.Spec.JobTemplate)
   249  	// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
   250  	name := getJobName(cj, scheduledTime)
   251  
   252  	if utilfeature.DefaultFeatureGate.Enabled(features.CronJobsScheduledAnnotation) {
   253  
   254  		timeZoneLocation, err := time.LoadLocation(pointer.StringDeref(cj.Spec.TimeZone, ""))
   255  		if err != nil {
   256  			return nil, err
   257  		}
   258  		// Append job creation timestamp to the cronJob annotations. The time will be in RFC3339 form.
   259  		annotations[batchv1.CronJobScheduledTimestampAnnotation] = scheduledTime.In(timeZoneLocation).Format(time.RFC3339)
   260  	}
   261  
   262  	job := &batchv1.Job{
   263  		ObjectMeta: metav1.ObjectMeta{
   264  			Labels:            labels,
   265  			Annotations:       annotations,
   266  			Name:              name,
   267  			CreationTimestamp: metav1.Time{Time: scheduledTime},
   268  			OwnerReferences:   []metav1.OwnerReference{*metav1.NewControllerRef(cj, controllerKind)},
   269  		},
   270  	}
   271  	cj.Spec.JobTemplate.Spec.DeepCopyInto(&job.Spec)
   272  	return job, nil
   273  }
   274  
   275  // getTimeHash returns Unix Epoch Time in minutes
   276  func getTimeHashInMinutes(scheduledTime time.Time) int64 {
   277  	return scheduledTime.Unix() / 60
   278  }
   279  
   280  func getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) {
   281  	for _, c := range j.Status.Conditions {
   282  		if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
   283  			return true, c.Type
   284  		}
   285  	}
   286  	return false, ""
   287  }
   288  
   289  // IsJobFinished returns whether or not a job has completed successfully or failed.
   290  func IsJobFinished(j *batchv1.Job) bool {
   291  	isFinished, _ := getFinishedStatus(j)
   292  	return isFinished
   293  }
   294  
   295  // IsJobSucceeded returns whether a job has completed successfully.
   296  func IsJobSucceeded(j *batchv1.Job) bool {
   297  	for _, c := range j.Status.Conditions {
   298  		if c.Type == batchv1.JobComplete && c.Status == corev1.ConditionTrue {
   299  			return true
   300  		}
   301  	}
   302  	return false
   303  }
   304  
   305  // byJobStartTime sorts a list of jobs by start timestamp, using their names as a tie breaker.
   306  type byJobStartTime []*batchv1.Job
   307  
   308  func (o byJobStartTime) Len() int      { return len(o) }
   309  func (o byJobStartTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
   310  
   311  func (o byJobStartTime) Less(i, j int) bool {
   312  	if o[i].Status.StartTime == nil && o[j].Status.StartTime != nil {
   313  		return false
   314  	}
   315  	if o[i].Status.StartTime != nil && o[j].Status.StartTime == nil {
   316  		return true
   317  	}
   318  	if o[i].Status.StartTime.Equal(o[j].Status.StartTime) {
   319  		return o[i].Name < o[j].Name
   320  	}
   321  	return o[i].Status.StartTime.Before(o[j].Status.StartTime)
   322  }
   323  

View as plain text