...

Source file src/k8s.io/kubernetes/test/e2e/apps/cronjob.go

Documentation: k8s.io/kubernetes/test/e2e/apps

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apps
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"time"
    24  
    25  	"github.com/onsi/ginkgo/v2"
    26  	"github.com/onsi/gomega"
    27  	"github.com/onsi/gomega/format"
    28  
    29  	batchv1 "k8s.io/api/batch/v1"
    30  	v1 "k8s.io/api/core/v1"
    31  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    34  	"k8s.io/apimachinery/pkg/runtime/schema"
    35  	"k8s.io/apimachinery/pkg/types"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	"k8s.io/apimachinery/pkg/watch"
    38  	clientset "k8s.io/client-go/kubernetes"
    39  	"k8s.io/client-go/kubernetes/scheme"
    40  	"k8s.io/client-go/util/retry"
    41  	batchinternal "k8s.io/kubernetes/pkg/apis/batch"
    42  	"k8s.io/kubernetes/pkg/controller/job"
    43  	"k8s.io/kubernetes/test/e2e/framework"
    44  	e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
    45  	e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
    46  	imageutils "k8s.io/kubernetes/test/utils/image"
    47  	admissionapi "k8s.io/pod-security-admission/api"
    48  )
    49  
    50  const (
    51  	// How long to wait for a cronjob
    52  	cronJobTimeout = 5 * time.Minute
    53  )
    54  
    55  var _ = SIGDescribe("CronJob", func() {
    56  	f := framework.NewDefaultFramework("cronjob")
    57  	f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
    58  
    59  	sleepCommand := []string{"sleep", "300"}
    60  
    61  	// Pod will complete instantly
    62  	successCommand := []string{"/bin/true"}
    63  	failureCommand := []string{"/bin/false"}
    64  
    65  	/*
    66  	   Release: v1.21
    67  	   Testname: CronJob AllowConcurrent
    68  	   Description: CronJob MUST support AllowConcurrent policy, allowing to run multiple jobs at the same time.
    69  	*/
    70  	framework.ConformanceIt("should schedule multiple jobs concurrently", func(ctx context.Context) {
    71  		ginkgo.By("Creating a cronjob")
    72  		cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batchv1.AllowConcurrent,
    73  			sleepCommand, nil, nil)
    74  		cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
    75  		framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
    76  
    77  		ginkgo.By("Ensuring more than one job is running at a time")
    78  		err = waitForActiveJobs(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, 2)
    79  		framework.ExpectNoError(err, "Failed to wait for active jobs in CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
    80  
    81  		ginkgo.By("Ensuring at least two running jobs exists by listing jobs explicitly")
    82  		jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(ctx, metav1.ListOptions{})
    83  		framework.ExpectNoError(err, "Failed to list the CronJobs in namespace %s", f.Namespace.Name)
    84  		activeJobs, _ := filterActiveJobs(jobs)
    85  		gomega.Expect(len(activeJobs)).To(gomega.BeNumerically(">=", 2))
    86  
    87  		ginkgo.By("Removing cronjob")
    88  		err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
    89  		framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
    90  	})
    91  
    92  	/*
    93  	   Release: v1.21
    94  	   Testname: CronJob Suspend
    95  	   Description: CronJob MUST support suspension, which suppresses creation of new jobs.
    96  	*/
    97  	framework.ConformanceIt("should not schedule jobs when suspended", f.WithSlow(), func(ctx context.Context) {
    98  		ginkgo.By("Creating a suspended cronjob")
    99  		cronJob := newTestCronJob("suspended", "*/1 * * * ?", batchv1.AllowConcurrent,
   100  			sleepCommand, nil, nil)
   101  		t := true
   102  		cronJob.Spec.Suspend = &t
   103  		cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
   104  		framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
   105  
   106  		ginkgo.By("Ensuring no jobs are scheduled")
   107  		gomega.Consistently(ctx, framework.GetObject(f.ClientSet.BatchV1().CronJobs(f.Namespace.Name).Get, cronJob.Name, metav1.GetOptions{})).WithPolling(framework.Poll).WithTimeout(cronJobTimeout).
   108  			Should(gomega.HaveField("Status.Active", gomega.BeEmpty()))
   109  
   110  		ginkgo.By("Ensuring no job exists by listing jobs explicitly")
   111  		jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(ctx, metav1.ListOptions{})
   112  		framework.ExpectNoError(err, "Failed to list the CronJobs in namespace %s", f.Namespace.Name)
   113  		gomega.Expect(jobs.Items).To(gomega.BeEmpty())
   114  
   115  		ginkgo.By("Removing cronjob")
   116  		err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
   117  		framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
   118  	})
   119  
   120  	/*
   121  	   Release: v1.21
   122  	   Testname: CronJob ForbidConcurrent
   123  	   Description: CronJob MUST support ForbidConcurrent policy, allowing to run single, previous job at the time.
   124  	*/
   125  	framework.ConformanceIt("should not schedule new jobs when ForbidConcurrent", f.WithSlow(), func(ctx context.Context) {
   126  		ginkgo.By("Creating a ForbidConcurrent cronjob")
   127  		cronJob := newTestCronJob("forbid", "*/1 * * * ?", batchv1.ForbidConcurrent,
   128  			sleepCommand, nil, nil)
   129  		cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
   130  		framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
   131  
   132  		ginkgo.By("Ensuring a job is scheduled")
   133  		err = waitForActiveJobs(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, 1)
   134  		framework.ExpectNoError(err, "Failed to schedule CronJob %s", cronJob.Name)
   135  
   136  		ginkgo.By("Ensuring exactly one is scheduled")
   137  		cronJob, err = getCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
   138  		framework.ExpectNoError(err, "Failed to get CronJob %s", cronJob.Name)
   139  		gomega.Expect(cronJob.Status.Active).Should(gomega.HaveLen(1))
   140  
   141  		ginkgo.By("Ensuring exactly one running job exists by listing jobs explicitly")
   142  		jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(ctx, metav1.ListOptions{})
   143  		framework.ExpectNoError(err, "Failed to list the CronJobs in namespace %s", f.Namespace.Name)
   144  		activeJobs, _ := filterActiveJobs(jobs)
   145  		gomega.Expect(activeJobs).To(gomega.HaveLen(1))
   146  
   147  		ginkgo.By("Ensuring no more jobs are scheduled")
   148  		gomega.Eventually(ctx, framework.GetObject(f.ClientSet.BatchV1().CronJobs(f.Namespace.Name).Get, cronJob.Name, metav1.GetOptions{})).WithPolling(framework.Poll).WithTimeout(cronJobTimeout).
   149  			Should(framework.MakeMatcher(func(cj *batchv1.CronJob) (func() string, error) {
   150  				if len(cj.Status.Active) < 2 {
   151  					return nil, nil
   152  				}
   153  				return func() string {
   154  					return fmt.Sprintf("unexpect active job number: %d\n", len(cj.Status.Active))
   155  				}, nil
   156  			}))
   157  
   158  		ginkgo.By("Removing cronjob")
   159  		err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
   160  		framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
   161  	})
   162  
   163  	/*
   164  	   Release: v1.21
   165  	   Testname: CronJob ReplaceConcurrent
   166  	   Description: CronJob MUST support ReplaceConcurrent policy, allowing to run single, newer job at the time.
   167  	*/
   168  	framework.ConformanceIt("should replace jobs when ReplaceConcurrent", func(ctx context.Context) {
   169  		ginkgo.By("Creating a ReplaceConcurrent cronjob")
   170  		cronJob := newTestCronJob("replace", "*/1 * * * ?", batchv1.ReplaceConcurrent,
   171  			sleepCommand, nil, nil)
   172  		cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
   173  		framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
   174  
   175  		ginkgo.By("Ensuring a job is scheduled")
   176  		err = waitForActiveJobs(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, 1)
   177  		framework.ExpectNoError(err, "Failed to schedule CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
   178  
   179  		ginkgo.By("Ensuring exactly one is scheduled")
   180  		cronJob, err = getCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
   181  		framework.ExpectNoError(err, "Failed to get CronJob %s", cronJob.Name)
   182  		gomega.Expect(cronJob.Status.Active).Should(gomega.HaveLen(1))
   183  
   184  		ginkgo.By("Ensuring exactly one running job exists by listing jobs explicitly")
   185  		jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(ctx, metav1.ListOptions{})
   186  		framework.ExpectNoError(err, "Failed to list the jobs in namespace %s", f.Namespace.Name)
   187  		activeJobs, _ := filterActiveJobs(jobs)
   188  		gomega.Expect(activeJobs).To(gomega.HaveLen(1))
   189  
   190  		ginkgo.By("Ensuring the job is replaced with a new one")
   191  		err = waitForJobReplaced(ctx, f.ClientSet, f.Namespace.Name, jobs.Items[0].Name)
   192  		framework.ExpectNoError(err, "Failed to replace CronJob %s in namespace %s", jobs.Items[0].Name, f.Namespace.Name)
   193  
   194  		ginkgo.By("Removing cronjob")
   195  		err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
   196  		framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
   197  	})
   198  
   199  	ginkgo.It("should be able to schedule after more than 100 missed schedule", func(ctx context.Context) {
   200  		ginkgo.By("Creating a cronjob")
   201  		cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batchv1.ForbidConcurrent,
   202  			sleepCommand, nil, nil)
   203  		creationTime := time.Now().Add(-99 * 24 * time.Hour)
   204  		lastScheduleTime := creationTime.Add(1 * 24 * time.Hour)
   205  		cronJob.CreationTimestamp = metav1.Time{Time: creationTime}
   206  		cronJob.Status.LastScheduleTime = &metav1.Time{Time: lastScheduleTime}
   207  		cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
   208  		framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
   209  
   210  		ginkgo.By("Ensuring one job is running")
   211  		err = waitForActiveJobs(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, 1)
   212  		framework.ExpectNoError(err, "Failed to wait for active jobs in CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
   213  
   214  		ginkgo.By("Ensuring at least one running jobs exists by listing jobs explicitly")
   215  		jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(ctx, metav1.ListOptions{})
   216  		framework.ExpectNoError(err, "Failed to list the CronJobs in namespace %s", f.Namespace.Name)
   217  		activeJobs, _ := filterActiveJobs(jobs)
   218  		gomega.Expect(activeJobs).ToNot(gomega.BeEmpty())
   219  
   220  		ginkgo.By("Removing cronjob")
   221  		err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
   222  		framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
   223  	})
   224  
   225  	// shouldn't give us unexpected warnings
   226  	ginkgo.It("should not emit unexpected warnings", func(ctx context.Context) {
   227  		ginkgo.By("Creating a cronjob")
   228  		cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batchv1.AllowConcurrent,
   229  			nil, nil, nil)
   230  		cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
   231  		framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
   232  
   233  		ginkgo.By("Ensuring at least two jobs and at least one finished job exists by listing jobs explicitly")
   234  		err = waitForJobsAtLeast(ctx, f.ClientSet, f.Namespace.Name, 2)
   235  		framework.ExpectNoError(err, "Failed to ensure at least two job exists in namespace %s", f.Namespace.Name)
   236  		err = waitForAnyFinishedJob(ctx, f.ClientSet, f.Namespace.Name)
   237  		framework.ExpectNoError(err, "Failed to ensure at least on finished job exists in namespace %s", f.Namespace.Name)
   238  
   239  		ginkgo.By("Ensuring no unexpected event has happened")
   240  		gomega.Eventually(ctx, framework.HandleRetry(func(ctx context.Context) (*v1.EventList, error) {
   241  			sj, err := getCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
   242  			if err != nil {
   243  				return nil, err
   244  			}
   245  			return f.ClientSet.CoreV1().Events(f.Namespace.Name).Search(scheme.Scheme, sj)
   246  		})).WithPolling(framework.Poll).WithTimeout(30 * time.Second).Should(framework.MakeMatcher(func(actual *v1.EventList) (failure func() string, err error) {
   247  			for _, e := range actual.Items {
   248  				for _, reason := range []string{"MissingJob", "UnexpectedJob"} {
   249  					if e.Reason == reason {
   250  						return func() string {
   251  							return fmt.Sprintf("unexpected event: %s\n", reason)
   252  						}, nil
   253  					}
   254  				}
   255  			}
   256  			return nil, nil
   257  		}))
   258  
   259  		ginkgo.By("Removing cronjob")
   260  		err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
   261  		framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
   262  	})
   263  
   264  	// deleted jobs should be removed from the active list
   265  	ginkgo.It("should remove from active list jobs that have been deleted", func(ctx context.Context) {
   266  		ginkgo.By("Creating a ForbidConcurrent cronjob")
   267  		cronJob := newTestCronJob("forbid", "*/1 * * * ?", batchv1.ForbidConcurrent,
   268  			sleepCommand, nil, nil)
   269  		cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
   270  		framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
   271  
   272  		ginkgo.By("Ensuring a job is scheduled")
   273  		err = waitForActiveJobs(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, 1)
   274  		framework.ExpectNoError(err, "Failed to ensure a %s cronjob is scheduled in namespace %s", cronJob.Name, f.Namespace.Name)
   275  
   276  		ginkgo.By("Ensuring exactly one is scheduled")
   277  		cronJob, err = getCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
   278  		framework.ExpectNoError(err, "Failed to ensure exactly one %s cronjob is scheduled in namespace %s", cronJob.Name, f.Namespace.Name)
   279  		gomega.Expect(cronJob.Status.Active).Should(gomega.HaveLen(1))
   280  
   281  		ginkgo.By("Deleting the job")
   282  		job := cronJob.Status.Active[0]
   283  		framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
   284  
   285  		ginkgo.By("Ensuring job was deleted")
   286  		_, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   287  		gomega.Expect(err).To(gomega.MatchError(apierrors.IsNotFound, fmt.Sprintf("Failed to delete %s cronjob in namespace %s", cronJob.Name, f.Namespace.Name)))
   288  
   289  		ginkgo.By("Ensuring the job is not in the cronjob active list")
   290  		err = waitForJobNotActive(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, job.Name)
   291  		framework.ExpectNoError(err, "Failed to ensure the %s cronjob is not in active list in namespace %s", cronJob.Name, f.Namespace.Name)
   292  
   293  		ginkgo.By("Ensuring MissingJob event has occurred")
   294  		err = waitForEventWithReason(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, []string{"MissingJob"})
   295  		framework.ExpectNoError(err, "Failed to ensure missing job event has occurred for %s cronjob in namespace %s", cronJob.Name, f.Namespace.Name)
   296  
   297  		ginkgo.By("Removing cronjob")
   298  		err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
   299  		framework.ExpectNoError(err, "Failed to remove %s cronjob in namespace %s", cronJob.Name, f.Namespace.Name)
   300  	})
   301  
   302  	// cleanup of successful finished jobs, with limit of one successful job
   303  	ginkgo.It("should delete successful finished jobs with limit of one successful job", func(ctx context.Context) {
   304  		ginkgo.By("Creating an AllowConcurrent cronjob with custom history limit")
   305  		successLimit := int32(1)
   306  		failedLimit := int32(0)
   307  		cronJob := newTestCronJob("successful-jobs-history-limit", "*/1 * * * ?", batchv1.AllowConcurrent,
   308  			successCommand, &successLimit, &failedLimit)
   309  
   310  		ensureHistoryLimits(ctx, f.ClientSet, f.Namespace.Name, cronJob)
   311  	})
   312  
   313  	// cleanup of failed finished jobs, with limit of one failed job
   314  	ginkgo.It("should delete failed finished jobs with limit of one job", func(ctx context.Context) {
   315  		ginkgo.By("Creating an AllowConcurrent cronjob with custom history limit")
   316  		successLimit := int32(0)
   317  		failedLimit := int32(1)
   318  		cronJob := newTestCronJob("failed-jobs-history-limit", "*/1 * * * ?", batchv1.AllowConcurrent,
   319  			failureCommand, &successLimit, &failedLimit)
   320  
   321  		ensureHistoryLimits(ctx, f.ClientSet, f.Namespace.Name, cronJob)
   322  	})
   323  
   324  	ginkgo.It("should support timezone", func(ctx context.Context) {
   325  		ginkgo.By("Creating a cronjob with TimeZone")
   326  		cronJob := newTestCronJob("cronjob-with-timezone", "*/1 * * * ?", batchv1.AllowConcurrent,
   327  			failureCommand, nil, nil)
   328  		badTimeZone := "bad-time-zone"
   329  		cronJob.Spec.TimeZone = &badTimeZone
   330  		_, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
   331  		gomega.Expect(err).To(gomega.MatchError(apierrors.IsInvalid, "Failed to create CronJob, invalid time zone."))
   332  	})
   333  
   334  	/*
   335  	   Release: v1.21
   336  	   Testname: CronJob API Operations
   337  	   Description:
   338  	   CronJob MUST support create, get, list, watch, update, patch, delete, and deletecollection.
   339  	   CronJob/status MUST support get, update and patch.
   340  	*/
   341  	framework.ConformanceIt("should support CronJob API operations", func(ctx context.Context) {
   342  		ginkgo.By("Creating a cronjob")
   343  		successLimit := int32(1)
   344  		failedLimit := int32(0)
   345  		cjTemplate := newTestCronJob("test-api", "* */1 * * ?", batchv1.AllowConcurrent,
   346  			successCommand, &successLimit, &failedLimit)
   347  		cjTemplate.Labels = map[string]string{
   348  			"special-label": f.UniqueName,
   349  		}
   350  
   351  		ns := f.Namespace.Name
   352  		cjVersion := "v1"
   353  		cjClient := f.ClientSet.BatchV1().CronJobs(ns)
   354  
   355  		ginkgo.By("creating")
   356  		createdCronJob, err := cjClient.Create(ctx, cjTemplate, metav1.CreateOptions{})
   357  		framework.ExpectNoError(err)
   358  
   359  		ginkgo.By("getting")
   360  		gottenCronJob, err := cjClient.Get(ctx, createdCronJob.Name, metav1.GetOptions{})
   361  		framework.ExpectNoError(err)
   362  		gomega.Expect(gottenCronJob.UID).To(gomega.Equal(createdCronJob.UID))
   363  
   364  		ginkgo.By("listing")
   365  		cjs, err := cjClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
   366  		framework.ExpectNoError(err)
   367  		gomega.Expect(cjs.Items).To(gomega.HaveLen(1), "filtered list should have 1 item")
   368  
   369  		ginkgo.By("watching")
   370  		framework.Logf("starting watch")
   371  		cjWatch, err := cjClient.Watch(ctx, metav1.ListOptions{ResourceVersion: cjs.ResourceVersion, LabelSelector: "special-label=" + f.UniqueName})
   372  		framework.ExpectNoError(err)
   373  
   374  		// Test cluster-wide list and watch
   375  		clusterCJClient := f.ClientSet.BatchV1().CronJobs("")
   376  		ginkgo.By("cluster-wide listing")
   377  		clusterCJs, err := clusterCJClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
   378  		framework.ExpectNoError(err)
   379  		gomega.Expect(clusterCJs.Items).To(gomega.HaveLen(1), "filtered list should have 1 item")
   380  
   381  		ginkgo.By("cluster-wide watching")
   382  		framework.Logf("starting watch")
   383  		_, err = clusterCJClient.Watch(ctx, metav1.ListOptions{ResourceVersion: cjs.ResourceVersion, LabelSelector: "special-label=" + f.UniqueName})
   384  		framework.ExpectNoError(err)
   385  
   386  		ginkgo.By("patching")
   387  		patchedCronJob, err := cjClient.Patch(ctx, createdCronJob.Name, types.MergePatchType,
   388  			[]byte(`{"metadata":{"annotations":{"patched":"true"}}}`), metav1.PatchOptions{})
   389  		framework.ExpectNoError(err)
   390  		gomega.Expect(patchedCronJob.Annotations).To(gomega.HaveKeyWithValue("patched", "true"), "patched object should have the applied annotation")
   391  
   392  		ginkgo.By("updating")
   393  		var cjToUpdate, updatedCronJob *batchv1.CronJob
   394  		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
   395  			cjToUpdate, err = cjClient.Get(ctx, createdCronJob.Name, metav1.GetOptions{})
   396  			if err != nil {
   397  				return err
   398  			}
   399  			cjToUpdate.Annotations["updated"] = "true"
   400  			updatedCronJob, err = cjClient.Update(ctx, cjToUpdate, metav1.UpdateOptions{})
   401  			return err
   402  		})
   403  		framework.ExpectNoError(err)
   404  		gomega.Expect(updatedCronJob.Annotations).To(gomega.HaveKeyWithValue("updated", "true"), "updated object should have the applied annotation")
   405  
   406  		framework.Logf("waiting for watch events with expected annotations")
   407  		for sawAnnotations := false; !sawAnnotations; {
   408  			select {
   409  			case evt, ok := <-cjWatch.ResultChan():
   410  
   411  				if !ok {
   412  					framework.Fail("Watch channel is closed.")
   413  				}
   414  				gomega.Expect(evt.Type).To(gomega.Equal(watch.Modified))
   415  				watchedCronJob, isCronJob := evt.Object.(*batchv1.CronJob)
   416  				if !isCronJob {
   417  					framework.Failf("expected CronJob, got %T", evt.Object)
   418  				}
   419  				if watchedCronJob.Annotations["patched"] == "true" {
   420  					framework.Logf("saw patched and updated annotations")
   421  					sawAnnotations = true
   422  					cjWatch.Stop()
   423  				} else {
   424  					framework.Logf("missing expected annotations, waiting: %#v", watchedCronJob.Annotations)
   425  				}
   426  			case <-time.After(wait.ForeverTestTimeout):
   427  				framework.Fail("timed out waiting for watch event")
   428  			}
   429  		}
   430  
   431  		// /status subresource operations
   432  		ginkgo.By("patching /status")
   433  		// we need to use RFC3339 version since conversion over the wire cuts nanoseconds
   434  		now1 := metav1.Now().Rfc3339Copy()
   435  		cjStatus := batchv1.CronJobStatus{
   436  			LastScheduleTime: &now1,
   437  		}
   438  		cjStatusJSON, err := json.Marshal(cjStatus)
   439  		framework.ExpectNoError(err)
   440  		patchedStatus, err := cjClient.Patch(ctx, createdCronJob.Name, types.MergePatchType,
   441  			[]byte(`{"metadata":{"annotations":{"patchedstatus":"true"}},"status":`+string(cjStatusJSON)+`}`),
   442  			metav1.PatchOptions{}, "status")
   443  		framework.ExpectNoError(err)
   444  		if !patchedStatus.Status.LastScheduleTime.Equal(&now1) {
   445  			framework.Failf("patched object should have the applied lastScheduleTime %#v, got %#v instead", cjStatus.LastScheduleTime, patchedStatus.Status.LastScheduleTime)
   446  		}
   447  		gomega.Expect(patchedStatus.Annotations).To(gomega.HaveKeyWithValue("patchedstatus", "true"), "patched object should have the applied annotation")
   448  
   449  		ginkgo.By("updating /status")
   450  		// we need to use RFC3339 version since conversion over the wire cuts nanoseconds
   451  		now2 := metav1.Now().Rfc3339Copy()
   452  		var statusToUpdate, updatedStatus *batchv1.CronJob
   453  		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
   454  			statusToUpdate, err = cjClient.Get(ctx, createdCronJob.Name, metav1.GetOptions{})
   455  			if err != nil {
   456  				return err
   457  			}
   458  			statusToUpdate.Status.LastScheduleTime = &now2
   459  			updatedStatus, err = cjClient.UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{})
   460  			return err
   461  		})
   462  		framework.ExpectNoError(err)
   463  
   464  		if !updatedStatus.Status.LastScheduleTime.Equal(&now2) {
   465  			framework.Failf("updated object status expected to have updated lastScheduleTime %#v, got %#v", statusToUpdate.Status.LastScheduleTime, updatedStatus.Status.LastScheduleTime)
   466  		}
   467  
   468  		ginkgo.By("get /status")
   469  		cjResource := schema.GroupVersionResource{Group: "batch", Version: cjVersion, Resource: "cronjobs"}
   470  		gottenStatus, err := f.DynamicClient.Resource(cjResource).Namespace(ns).Get(ctx, createdCronJob.Name, metav1.GetOptions{}, "status")
   471  		framework.ExpectNoError(err)
   472  		statusUID, _, err := unstructured.NestedFieldCopy(gottenStatus.Object, "metadata", "uid")
   473  		framework.ExpectNoError(err)
   474  		gomega.Expect(string(createdCronJob.UID)).To(gomega.Equal(statusUID), "createdCronJob.UID: %v expected to match statusUID: %v ", createdCronJob.UID, statusUID)
   475  
   476  		// CronJob resource delete operations
   477  		expectFinalizer := func(cj *batchv1.CronJob, msg string) {
   478  			gomega.Expect(cj.DeletionTimestamp).NotTo(gomega.BeNil(), fmt.Sprintf("expected deletionTimestamp, got nil on step: %q, cronjob: %+v", msg, cj))
   479  			gomega.Expect(cj.Finalizers).ToNot(gomega.BeEmpty(), "expected finalizers on cronjob, got none on step: %q, cronjob: %+v", msg, cj)
   480  		}
   481  
   482  		ginkgo.By("deleting")
   483  		cjTemplate.Name = "for-removal"
   484  		forRemovalCronJob, err := cjClient.Create(ctx, cjTemplate, metav1.CreateOptions{})
   485  		framework.ExpectNoError(err)
   486  		err = cjClient.Delete(ctx, forRemovalCronJob.Name, metav1.DeleteOptions{})
   487  		framework.ExpectNoError(err)
   488  		cj, err := cjClient.Get(ctx, forRemovalCronJob.Name, metav1.GetOptions{})
   489  		// If controller does not support finalizers, we expect a 404.  Otherwise we validate finalizer behavior.
   490  		if err == nil {
   491  			expectFinalizer(cj, "deleting cronjob")
   492  		} else if !apierrors.IsNotFound(err) {
   493  			framework.Failf("expected 404, got %v", err)
   494  		}
   495  
   496  		ginkgo.By("deleting a collection")
   497  		err = cjClient.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
   498  		framework.ExpectNoError(err)
   499  		cjs, err = cjClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
   500  		framework.ExpectNoError(err)
   501  		// Should have <= 2 items since some cronjobs might not have been deleted yet due to finalizers
   502  		gomega.Expect(len(cjs.Items)).To(gomega.BeNumerically("<=", 2), "filtered list length should be <= 2, got:\n%s", format.Object(cjs.Items, 1))
   503  		// Validate finalizers
   504  		for _, cj := range cjs.Items {
   505  			expectFinalizer(&cj, "deleting cronjob collection")
   506  		}
   507  	})
   508  
   509  })
   510  
   511  func ensureHistoryLimits(ctx context.Context, c clientset.Interface, ns string, cronJob *batchv1.CronJob) {
   512  	cronJob, err := createCronJob(ctx, c, ns, cronJob)
   513  	framework.ExpectNoError(err, "Failed to create allowconcurrent cronjob with custom history limits in namespace %s", ns)
   514  
   515  	// Job is going to complete instantly: do not check for an active job
   516  	// as we are most likely to miss it
   517  
   518  	ginkgo.By("Ensuring a finished job exists")
   519  	err = waitForAnyFinishedJob(ctx, c, ns)
   520  	framework.ExpectNoError(err, "Failed to ensure a finished cronjob exists in namespace %s", ns)
   521  
   522  	ginkgo.By("Ensuring a finished job exists by listing jobs explicitly")
   523  	jobs, err := c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
   524  	framework.ExpectNoError(err, "Failed to ensure a finished cronjob exists by listing jobs explicitly in namespace %s", ns)
   525  	activeJobs, finishedJobs := filterActiveJobs(jobs)
   526  	if len(finishedJobs) != 1 {
   527  		framework.Logf("Expected one finished job in namespace %s; activeJobs=%v; finishedJobs=%v", ns, activeJobs, finishedJobs)
   528  		gomega.Expect(finishedJobs).To(gomega.HaveLen(1))
   529  	}
   530  
   531  	// Job should get deleted when the next job finishes the next minute
   532  	ginkgo.By("Ensuring this job and its pods does not exist anymore")
   533  	err = waitForJobToDisappear(ctx, c, ns, finishedJobs[0])
   534  	framework.ExpectNoError(err, "Failed to ensure that job does not exists anymore in namespace %s", ns)
   535  	err = waitForJobsPodToDisappear(ctx, c, ns, finishedJobs[0])
   536  	framework.ExpectNoError(err, "Failed to ensure that pods for job does not exists anymore in namespace %s", ns)
   537  
   538  	ginkgo.By("Ensuring there is 1 finished job by listing jobs explicitly")
   539  	jobs, err = c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
   540  	framework.ExpectNoError(err, "Failed to ensure there is one finished job by listing job explicitly in namespace %s", ns)
   541  	activeJobs, finishedJobs = filterActiveJobs(jobs)
   542  	if len(finishedJobs) != 1 {
   543  		framework.Logf("Expected one finished job in namespace %s; activeJobs=%v; finishedJobs=%v", ns, activeJobs, finishedJobs)
   544  		gomega.Expect(finishedJobs).To(gomega.HaveLen(1))
   545  	}
   546  
   547  	ginkgo.By("Removing cronjob")
   548  	err = deleteCronJob(ctx, c, ns, cronJob.Name)
   549  	framework.ExpectNoError(err, "Failed to remove the %s cronjob in namespace %s", cronJob.Name, ns)
   550  }
   551  
   552  // newTestCronJob returns a cronjob which does one of several testing behaviors.
   553  func newTestCronJob(name, schedule string, concurrencyPolicy batchv1.ConcurrencyPolicy,
   554  	command []string, successfulJobsHistoryLimit *int32, failedJobsHistoryLimit *int32) *batchv1.CronJob {
   555  	parallelism := int32(1)
   556  	completions := int32(1)
   557  	backofflimit := int32(1)
   558  	sj := &batchv1.CronJob{
   559  		ObjectMeta: metav1.ObjectMeta{
   560  			Name: name,
   561  		},
   562  		TypeMeta: metav1.TypeMeta{
   563  			Kind: "CronJob",
   564  		},
   565  		Spec: batchv1.CronJobSpec{
   566  			Schedule:          schedule,
   567  			ConcurrencyPolicy: concurrencyPolicy,
   568  			JobTemplate: batchv1.JobTemplateSpec{
   569  				Spec: batchv1.JobSpec{
   570  					Parallelism:  &parallelism,
   571  					Completions:  &completions,
   572  					BackoffLimit: &backofflimit,
   573  					Template: v1.PodTemplateSpec{
   574  						Spec: v1.PodSpec{
   575  							RestartPolicy: v1.RestartPolicyOnFailure,
   576  							Volumes: []v1.Volume{
   577  								{
   578  									Name: "data",
   579  									VolumeSource: v1.VolumeSource{
   580  										EmptyDir: &v1.EmptyDirVolumeSource{},
   581  									},
   582  								},
   583  							},
   584  							Containers: []v1.Container{
   585  								{
   586  									Name:  "c",
   587  									Image: imageutils.GetE2EImage(imageutils.BusyBox),
   588  									VolumeMounts: []v1.VolumeMount{
   589  										{
   590  											MountPath: "/data",
   591  											Name:      "data",
   592  										},
   593  									},
   594  								},
   595  							},
   596  						},
   597  					},
   598  				},
   599  			},
   600  		},
   601  	}
   602  	sj.Spec.SuccessfulJobsHistoryLimit = successfulJobsHistoryLimit
   603  	sj.Spec.FailedJobsHistoryLimit = failedJobsHistoryLimit
   604  	if command != nil {
   605  		sj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = command
   606  	}
   607  	return sj
   608  }
   609  
   610  func createCronJob(ctx context.Context, c clientset.Interface, ns string, cronJob *batchv1.CronJob) (*batchv1.CronJob, error) {
   611  	return c.BatchV1().CronJobs(ns).Create(ctx, cronJob, metav1.CreateOptions{})
   612  }
   613  
   614  func getCronJob(ctx context.Context, c clientset.Interface, ns, name string) (*batchv1.CronJob, error) {
   615  	return c.BatchV1().CronJobs(ns).Get(ctx, name, metav1.GetOptions{})
   616  }
   617  
   618  func deleteCronJob(ctx context.Context, c clientset.Interface, ns, name string) error {
   619  	propagationPolicy := metav1.DeletePropagationBackground // Also delete jobs and pods related to cronjob
   620  	return c.BatchV1().CronJobs(ns).Delete(ctx, name, metav1.DeleteOptions{PropagationPolicy: &propagationPolicy})
   621  }
   622  
   623  // Wait for at least given amount of active jobs.
   624  func waitForActiveJobs(ctx context.Context, c clientset.Interface, ns, cronJobName string, active int) error {
   625  	return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
   626  		curr, err := getCronJob(ctx, c, ns, cronJobName)
   627  		if err != nil {
   628  			return false, err
   629  		}
   630  		return len(curr.Status.Active) >= active, nil
   631  	})
   632  }
   633  
   634  // Wait till a given job actually goes away from the Active list for a given cronjob
   635  func waitForJobNotActive(ctx context.Context, c clientset.Interface, ns, cronJobName, jobName string) error {
   636  	return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
   637  		curr, err := getCronJob(ctx, c, ns, cronJobName)
   638  		if err != nil {
   639  			return false, err
   640  		}
   641  
   642  		for _, j := range curr.Status.Active {
   643  			if j.Name == jobName {
   644  				return false, nil
   645  			}
   646  		}
   647  		return true, nil
   648  	})
   649  }
   650  
   651  // Wait for a job to disappear by listing them explicitly.
   652  func waitForJobToDisappear(ctx context.Context, c clientset.Interface, ns string, targetJob *batchv1.Job) error {
   653  	return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
   654  		jobs, err := c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
   655  		if err != nil {
   656  			return false, err
   657  		}
   658  		_, finishedJobs := filterActiveJobs(jobs)
   659  		for _, job := range finishedJobs {
   660  			if targetJob.Namespace == job.Namespace && targetJob.Name == job.Name {
   661  				return false, nil
   662  			}
   663  		}
   664  		return true, nil
   665  	})
   666  }
   667  
   668  // Wait for a pod to disappear by listing them explicitly.
   669  func waitForJobsPodToDisappear(ctx context.Context, c clientset.Interface, ns string, targetJob *batchv1.Job) error {
   670  	return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
   671  		options := metav1.ListOptions{LabelSelector: fmt.Sprintf("controller-uid=%s", targetJob.UID)}
   672  		pods, err := c.CoreV1().Pods(ns).List(ctx, options)
   673  		if err != nil {
   674  			return false, err
   675  		}
   676  		return len(pods.Items) == 0, nil
   677  	})
   678  }
   679  
   680  // Wait for a job to be replaced with a new one.
   681  func waitForJobReplaced(ctx context.Context, c clientset.Interface, ns, previousJobName string) error {
   682  	return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
   683  		jobs, err := c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
   684  		if err != nil {
   685  			return false, err
   686  		}
   687  		// Ignore Jobs pending deletion, since deletion of Jobs is now asynchronous.
   688  		aliveJobs := filterNotDeletedJobs(jobs)
   689  		if len(aliveJobs) > 1 {
   690  			return false, fmt.Errorf("more than one job is running %+v", jobs.Items)
   691  		} else if len(aliveJobs) == 0 {
   692  			framework.Logf("Warning: Found 0 jobs in namespace %v", ns)
   693  			return false, nil
   694  		}
   695  		return aliveJobs[0].Name != previousJobName, nil
   696  	})
   697  }
   698  
   699  // waitForJobsAtLeast waits for at least a number of jobs to appear.
   700  func waitForJobsAtLeast(ctx context.Context, c clientset.Interface, ns string, atLeast int) error {
   701  	return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
   702  		jobs, err := c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
   703  		if err != nil {
   704  			return false, err
   705  		}
   706  		return len(jobs.Items) >= atLeast, nil
   707  	})
   708  }
   709  
   710  // waitForAnyFinishedJob waits for any completed job to appear.
   711  func waitForAnyFinishedJob(ctx context.Context, c clientset.Interface, ns string) error {
   712  	return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
   713  		jobs, err := c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
   714  		if err != nil {
   715  			return false, err
   716  		}
   717  		for i := range jobs.Items {
   718  			if job.IsJobFinished(&jobs.Items[i]) {
   719  				return true, nil
   720  			}
   721  		}
   722  		return false, nil
   723  	})
   724  }
   725  
   726  // waitForEventWithReason waits for events with a reason within a list has occurred
   727  func waitForEventWithReason(ctx context.Context, c clientset.Interface, ns, cronJobName string, reasons []string) error {
   728  	return wait.PollWithContext(ctx, framework.Poll, 30*time.Second, func(ctx context.Context) (bool, error) {
   729  		sj, err := getCronJob(ctx, c, ns, cronJobName)
   730  		if err != nil {
   731  			return false, err
   732  		}
   733  		events, err := c.CoreV1().Events(ns).Search(scheme.Scheme, sj)
   734  		if err != nil {
   735  			return false, err
   736  		}
   737  		for _, e := range events.Items {
   738  			for _, reason := range reasons {
   739  				if e.Reason == reason {
   740  					return true, nil
   741  				}
   742  			}
   743  		}
   744  		return false, nil
   745  	})
   746  }
   747  
   748  // filterNotDeletedJobs returns the job list without any jobs that are pending
   749  // deletion.
   750  func filterNotDeletedJobs(jobs *batchv1.JobList) []*batchv1.Job {
   751  	var alive []*batchv1.Job
   752  	for i := range jobs.Items {
   753  		job := &jobs.Items[i]
   754  		if job.DeletionTimestamp == nil {
   755  			alive = append(alive, job)
   756  		}
   757  	}
   758  	return alive
   759  }
   760  
   761  func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job, finished []*batchv1.Job) {
   762  	for i := range jobs.Items {
   763  		j := jobs.Items[i]
   764  		if !job.IsJobFinished(&j) {
   765  			active = append(active, &j)
   766  		} else {
   767  			finished = append(finished, &j)
   768  		}
   769  	}
   770  	return
   771  }
   772  

View as plain text