1
16
17 package apps
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "time"
24
25 "github.com/onsi/ginkgo/v2"
26 "github.com/onsi/gomega"
27 "github.com/onsi/gomega/format"
28
29 batchv1 "k8s.io/api/batch/v1"
30 v1 "k8s.io/api/core/v1"
31 apierrors "k8s.io/apimachinery/pkg/api/errors"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
34 "k8s.io/apimachinery/pkg/runtime/schema"
35 "k8s.io/apimachinery/pkg/types"
36 "k8s.io/apimachinery/pkg/util/wait"
37 "k8s.io/apimachinery/pkg/watch"
38 clientset "k8s.io/client-go/kubernetes"
39 "k8s.io/client-go/kubernetes/scheme"
40 "k8s.io/client-go/util/retry"
41 batchinternal "k8s.io/kubernetes/pkg/apis/batch"
42 "k8s.io/kubernetes/pkg/controller/job"
43 "k8s.io/kubernetes/test/e2e/framework"
44 e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
45 e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
46 imageutils "k8s.io/kubernetes/test/utils/image"
47 admissionapi "k8s.io/pod-security-admission/api"
48 )
49
50 const (
51
52 cronJobTimeout = 5 * time.Minute
53 )
54
55 var _ = SIGDescribe("CronJob", func() {
56 f := framework.NewDefaultFramework("cronjob")
57 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
58
59 sleepCommand := []string{"sleep", "300"}
60
61
62 successCommand := []string{"/bin/true"}
63 failureCommand := []string{"/bin/false"}
64
65
70 framework.ConformanceIt("should schedule multiple jobs concurrently", func(ctx context.Context) {
71 ginkgo.By("Creating a cronjob")
72 cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batchv1.AllowConcurrent,
73 sleepCommand, nil, nil)
74 cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
75 framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
76
77 ginkgo.By("Ensuring more than one job is running at a time")
78 err = waitForActiveJobs(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, 2)
79 framework.ExpectNoError(err, "Failed to wait for active jobs in CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
80
81 ginkgo.By("Ensuring at least two running jobs exists by listing jobs explicitly")
82 jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(ctx, metav1.ListOptions{})
83 framework.ExpectNoError(err, "Failed to list the CronJobs in namespace %s", f.Namespace.Name)
84 activeJobs, _ := filterActiveJobs(jobs)
85 gomega.Expect(len(activeJobs)).To(gomega.BeNumerically(">=", 2))
86
87 ginkgo.By("Removing cronjob")
88 err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
89 framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
90 })
91
92
97 framework.ConformanceIt("should not schedule jobs when suspended", f.WithSlow(), func(ctx context.Context) {
98 ginkgo.By("Creating a suspended cronjob")
99 cronJob := newTestCronJob("suspended", "*/1 * * * ?", batchv1.AllowConcurrent,
100 sleepCommand, nil, nil)
101 t := true
102 cronJob.Spec.Suspend = &t
103 cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
104 framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
105
106 ginkgo.By("Ensuring no jobs are scheduled")
107 gomega.Consistently(ctx, framework.GetObject(f.ClientSet.BatchV1().CronJobs(f.Namespace.Name).Get, cronJob.Name, metav1.GetOptions{})).WithPolling(framework.Poll).WithTimeout(cronJobTimeout).
108 Should(gomega.HaveField("Status.Active", gomega.BeEmpty()))
109
110 ginkgo.By("Ensuring no job exists by listing jobs explicitly")
111 jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(ctx, metav1.ListOptions{})
112 framework.ExpectNoError(err, "Failed to list the CronJobs in namespace %s", f.Namespace.Name)
113 gomega.Expect(jobs.Items).To(gomega.BeEmpty())
114
115 ginkgo.By("Removing cronjob")
116 err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
117 framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
118 })
119
120
125 framework.ConformanceIt("should not schedule new jobs when ForbidConcurrent", f.WithSlow(), func(ctx context.Context) {
126 ginkgo.By("Creating a ForbidConcurrent cronjob")
127 cronJob := newTestCronJob("forbid", "*/1 * * * ?", batchv1.ForbidConcurrent,
128 sleepCommand, nil, nil)
129 cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
130 framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
131
132 ginkgo.By("Ensuring a job is scheduled")
133 err = waitForActiveJobs(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, 1)
134 framework.ExpectNoError(err, "Failed to schedule CronJob %s", cronJob.Name)
135
136 ginkgo.By("Ensuring exactly one is scheduled")
137 cronJob, err = getCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
138 framework.ExpectNoError(err, "Failed to get CronJob %s", cronJob.Name)
139 gomega.Expect(cronJob.Status.Active).Should(gomega.HaveLen(1))
140
141 ginkgo.By("Ensuring exactly one running job exists by listing jobs explicitly")
142 jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(ctx, metav1.ListOptions{})
143 framework.ExpectNoError(err, "Failed to list the CronJobs in namespace %s", f.Namespace.Name)
144 activeJobs, _ := filterActiveJobs(jobs)
145 gomega.Expect(activeJobs).To(gomega.HaveLen(1))
146
147 ginkgo.By("Ensuring no more jobs are scheduled")
148 gomega.Eventually(ctx, framework.GetObject(f.ClientSet.BatchV1().CronJobs(f.Namespace.Name).Get, cronJob.Name, metav1.GetOptions{})).WithPolling(framework.Poll).WithTimeout(cronJobTimeout).
149 Should(framework.MakeMatcher(func(cj *batchv1.CronJob) (func() string, error) {
150 if len(cj.Status.Active) < 2 {
151 return nil, nil
152 }
153 return func() string {
154 return fmt.Sprintf("unexpect active job number: %d\n", len(cj.Status.Active))
155 }, nil
156 }))
157
158 ginkgo.By("Removing cronjob")
159 err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
160 framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
161 })
162
163
168 framework.ConformanceIt("should replace jobs when ReplaceConcurrent", func(ctx context.Context) {
169 ginkgo.By("Creating a ReplaceConcurrent cronjob")
170 cronJob := newTestCronJob("replace", "*/1 * * * ?", batchv1.ReplaceConcurrent,
171 sleepCommand, nil, nil)
172 cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
173 framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
174
175 ginkgo.By("Ensuring a job is scheduled")
176 err = waitForActiveJobs(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, 1)
177 framework.ExpectNoError(err, "Failed to schedule CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
178
179 ginkgo.By("Ensuring exactly one is scheduled")
180 cronJob, err = getCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
181 framework.ExpectNoError(err, "Failed to get CronJob %s", cronJob.Name)
182 gomega.Expect(cronJob.Status.Active).Should(gomega.HaveLen(1))
183
184 ginkgo.By("Ensuring exactly one running job exists by listing jobs explicitly")
185 jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(ctx, metav1.ListOptions{})
186 framework.ExpectNoError(err, "Failed to list the jobs in namespace %s", f.Namespace.Name)
187 activeJobs, _ := filterActiveJobs(jobs)
188 gomega.Expect(activeJobs).To(gomega.HaveLen(1))
189
190 ginkgo.By("Ensuring the job is replaced with a new one")
191 err = waitForJobReplaced(ctx, f.ClientSet, f.Namespace.Name, jobs.Items[0].Name)
192 framework.ExpectNoError(err, "Failed to replace CronJob %s in namespace %s", jobs.Items[0].Name, f.Namespace.Name)
193
194 ginkgo.By("Removing cronjob")
195 err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
196 framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
197 })
198
199 ginkgo.It("should be able to schedule after more than 100 missed schedule", func(ctx context.Context) {
200 ginkgo.By("Creating a cronjob")
201 cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batchv1.ForbidConcurrent,
202 sleepCommand, nil, nil)
203 creationTime := time.Now().Add(-99 * 24 * time.Hour)
204 lastScheduleTime := creationTime.Add(1 * 24 * time.Hour)
205 cronJob.CreationTimestamp = metav1.Time{Time: creationTime}
206 cronJob.Status.LastScheduleTime = &metav1.Time{Time: lastScheduleTime}
207 cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
208 framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
209
210 ginkgo.By("Ensuring one job is running")
211 err = waitForActiveJobs(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, 1)
212 framework.ExpectNoError(err, "Failed to wait for active jobs in CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
213
214 ginkgo.By("Ensuring at least one running jobs exists by listing jobs explicitly")
215 jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(ctx, metav1.ListOptions{})
216 framework.ExpectNoError(err, "Failed to list the CronJobs in namespace %s", f.Namespace.Name)
217 activeJobs, _ := filterActiveJobs(jobs)
218 gomega.Expect(activeJobs).ToNot(gomega.BeEmpty())
219
220 ginkgo.By("Removing cronjob")
221 err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
222 framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
223 })
224
225
226 ginkgo.It("should not emit unexpected warnings", func(ctx context.Context) {
227 ginkgo.By("Creating a cronjob")
228 cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batchv1.AllowConcurrent,
229 nil, nil, nil)
230 cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
231 framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
232
233 ginkgo.By("Ensuring at least two jobs and at least one finished job exists by listing jobs explicitly")
234 err = waitForJobsAtLeast(ctx, f.ClientSet, f.Namespace.Name, 2)
235 framework.ExpectNoError(err, "Failed to ensure at least two job exists in namespace %s", f.Namespace.Name)
236 err = waitForAnyFinishedJob(ctx, f.ClientSet, f.Namespace.Name)
237 framework.ExpectNoError(err, "Failed to ensure at least on finished job exists in namespace %s", f.Namespace.Name)
238
239 ginkgo.By("Ensuring no unexpected event has happened")
240 gomega.Eventually(ctx, framework.HandleRetry(func(ctx context.Context) (*v1.EventList, error) {
241 sj, err := getCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
242 if err != nil {
243 return nil, err
244 }
245 return f.ClientSet.CoreV1().Events(f.Namespace.Name).Search(scheme.Scheme, sj)
246 })).WithPolling(framework.Poll).WithTimeout(30 * time.Second).Should(framework.MakeMatcher(func(actual *v1.EventList) (failure func() string, err error) {
247 for _, e := range actual.Items {
248 for _, reason := range []string{"MissingJob", "UnexpectedJob"} {
249 if e.Reason == reason {
250 return func() string {
251 return fmt.Sprintf("unexpected event: %s\n", reason)
252 }, nil
253 }
254 }
255 }
256 return nil, nil
257 }))
258
259 ginkgo.By("Removing cronjob")
260 err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
261 framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
262 })
263
264
265 ginkgo.It("should remove from active list jobs that have been deleted", func(ctx context.Context) {
266 ginkgo.By("Creating a ForbidConcurrent cronjob")
267 cronJob := newTestCronJob("forbid", "*/1 * * * ?", batchv1.ForbidConcurrent,
268 sleepCommand, nil, nil)
269 cronJob, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
270 framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
271
272 ginkgo.By("Ensuring a job is scheduled")
273 err = waitForActiveJobs(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, 1)
274 framework.ExpectNoError(err, "Failed to ensure a %s cronjob is scheduled in namespace %s", cronJob.Name, f.Namespace.Name)
275
276 ginkgo.By("Ensuring exactly one is scheduled")
277 cronJob, err = getCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
278 framework.ExpectNoError(err, "Failed to ensure exactly one %s cronjob is scheduled in namespace %s", cronJob.Name, f.Namespace.Name)
279 gomega.Expect(cronJob.Status.Active).Should(gomega.HaveLen(1))
280
281 ginkgo.By("Deleting the job")
282 job := cronJob.Status.Active[0]
283 framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
284
285 ginkgo.By("Ensuring job was deleted")
286 _, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
287 gomega.Expect(err).To(gomega.MatchError(apierrors.IsNotFound, fmt.Sprintf("Failed to delete %s cronjob in namespace %s", cronJob.Name, f.Namespace.Name)))
288
289 ginkgo.By("Ensuring the job is not in the cronjob active list")
290 err = waitForJobNotActive(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, job.Name)
291 framework.ExpectNoError(err, "Failed to ensure the %s cronjob is not in active list in namespace %s", cronJob.Name, f.Namespace.Name)
292
293 ginkgo.By("Ensuring MissingJob event has occurred")
294 err = waitForEventWithReason(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name, []string{"MissingJob"})
295 framework.ExpectNoError(err, "Failed to ensure missing job event has occurred for %s cronjob in namespace %s", cronJob.Name, f.Namespace.Name)
296
297 ginkgo.By("Removing cronjob")
298 err = deleteCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob.Name)
299 framework.ExpectNoError(err, "Failed to remove %s cronjob in namespace %s", cronJob.Name, f.Namespace.Name)
300 })
301
302
303 ginkgo.It("should delete successful finished jobs with limit of one successful job", func(ctx context.Context) {
304 ginkgo.By("Creating an AllowConcurrent cronjob with custom history limit")
305 successLimit := int32(1)
306 failedLimit := int32(0)
307 cronJob := newTestCronJob("successful-jobs-history-limit", "*/1 * * * ?", batchv1.AllowConcurrent,
308 successCommand, &successLimit, &failedLimit)
309
310 ensureHistoryLimits(ctx, f.ClientSet, f.Namespace.Name, cronJob)
311 })
312
313
314 ginkgo.It("should delete failed finished jobs with limit of one job", func(ctx context.Context) {
315 ginkgo.By("Creating an AllowConcurrent cronjob with custom history limit")
316 successLimit := int32(0)
317 failedLimit := int32(1)
318 cronJob := newTestCronJob("failed-jobs-history-limit", "*/1 * * * ?", batchv1.AllowConcurrent,
319 failureCommand, &successLimit, &failedLimit)
320
321 ensureHistoryLimits(ctx, f.ClientSet, f.Namespace.Name, cronJob)
322 })
323
324 ginkgo.It("should support timezone", func(ctx context.Context) {
325 ginkgo.By("Creating a cronjob with TimeZone")
326 cronJob := newTestCronJob("cronjob-with-timezone", "*/1 * * * ?", batchv1.AllowConcurrent,
327 failureCommand, nil, nil)
328 badTimeZone := "bad-time-zone"
329 cronJob.Spec.TimeZone = &badTimeZone
330 _, err := createCronJob(ctx, f.ClientSet, f.Namespace.Name, cronJob)
331 gomega.Expect(err).To(gomega.MatchError(apierrors.IsInvalid, "Failed to create CronJob, invalid time zone."))
332 })
333
334
341 framework.ConformanceIt("should support CronJob API operations", func(ctx context.Context) {
342 ginkgo.By("Creating a cronjob")
343 successLimit := int32(1)
344 failedLimit := int32(0)
345 cjTemplate := newTestCronJob("test-api", "* */1 * * ?", batchv1.AllowConcurrent,
346 successCommand, &successLimit, &failedLimit)
347 cjTemplate.Labels = map[string]string{
348 "special-label": f.UniqueName,
349 }
350
351 ns := f.Namespace.Name
352 cjVersion := "v1"
353 cjClient := f.ClientSet.BatchV1().CronJobs(ns)
354
355 ginkgo.By("creating")
356 createdCronJob, err := cjClient.Create(ctx, cjTemplate, metav1.CreateOptions{})
357 framework.ExpectNoError(err)
358
359 ginkgo.By("getting")
360 gottenCronJob, err := cjClient.Get(ctx, createdCronJob.Name, metav1.GetOptions{})
361 framework.ExpectNoError(err)
362 gomega.Expect(gottenCronJob.UID).To(gomega.Equal(createdCronJob.UID))
363
364 ginkgo.By("listing")
365 cjs, err := cjClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
366 framework.ExpectNoError(err)
367 gomega.Expect(cjs.Items).To(gomega.HaveLen(1), "filtered list should have 1 item")
368
369 ginkgo.By("watching")
370 framework.Logf("starting watch")
371 cjWatch, err := cjClient.Watch(ctx, metav1.ListOptions{ResourceVersion: cjs.ResourceVersion, LabelSelector: "special-label=" + f.UniqueName})
372 framework.ExpectNoError(err)
373
374
375 clusterCJClient := f.ClientSet.BatchV1().CronJobs("")
376 ginkgo.By("cluster-wide listing")
377 clusterCJs, err := clusterCJClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
378 framework.ExpectNoError(err)
379 gomega.Expect(clusterCJs.Items).To(gomega.HaveLen(1), "filtered list should have 1 item")
380
381 ginkgo.By("cluster-wide watching")
382 framework.Logf("starting watch")
383 _, err = clusterCJClient.Watch(ctx, metav1.ListOptions{ResourceVersion: cjs.ResourceVersion, LabelSelector: "special-label=" + f.UniqueName})
384 framework.ExpectNoError(err)
385
386 ginkgo.By("patching")
387 patchedCronJob, err := cjClient.Patch(ctx, createdCronJob.Name, types.MergePatchType,
388 []byte(`{"metadata":{"annotations":{"patched":"true"}}}`), metav1.PatchOptions{})
389 framework.ExpectNoError(err)
390 gomega.Expect(patchedCronJob.Annotations).To(gomega.HaveKeyWithValue("patched", "true"), "patched object should have the applied annotation")
391
392 ginkgo.By("updating")
393 var cjToUpdate, updatedCronJob *batchv1.CronJob
394 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
395 cjToUpdate, err = cjClient.Get(ctx, createdCronJob.Name, metav1.GetOptions{})
396 if err != nil {
397 return err
398 }
399 cjToUpdate.Annotations["updated"] = "true"
400 updatedCronJob, err = cjClient.Update(ctx, cjToUpdate, metav1.UpdateOptions{})
401 return err
402 })
403 framework.ExpectNoError(err)
404 gomega.Expect(updatedCronJob.Annotations).To(gomega.HaveKeyWithValue("updated", "true"), "updated object should have the applied annotation")
405
406 framework.Logf("waiting for watch events with expected annotations")
407 for sawAnnotations := false; !sawAnnotations; {
408 select {
409 case evt, ok := <-cjWatch.ResultChan():
410
411 if !ok {
412 framework.Fail("Watch channel is closed.")
413 }
414 gomega.Expect(evt.Type).To(gomega.Equal(watch.Modified))
415 watchedCronJob, isCronJob := evt.Object.(*batchv1.CronJob)
416 if !isCronJob {
417 framework.Failf("expected CronJob, got %T", evt.Object)
418 }
419 if watchedCronJob.Annotations["patched"] == "true" {
420 framework.Logf("saw patched and updated annotations")
421 sawAnnotations = true
422 cjWatch.Stop()
423 } else {
424 framework.Logf("missing expected annotations, waiting: %#v", watchedCronJob.Annotations)
425 }
426 case <-time.After(wait.ForeverTestTimeout):
427 framework.Fail("timed out waiting for watch event")
428 }
429 }
430
431
432 ginkgo.By("patching /status")
433
434 now1 := metav1.Now().Rfc3339Copy()
435 cjStatus := batchv1.CronJobStatus{
436 LastScheduleTime: &now1,
437 }
438 cjStatusJSON, err := json.Marshal(cjStatus)
439 framework.ExpectNoError(err)
440 patchedStatus, err := cjClient.Patch(ctx, createdCronJob.Name, types.MergePatchType,
441 []byte(`{"metadata":{"annotations":{"patchedstatus":"true"}},"status":`+string(cjStatusJSON)+`}`),
442 metav1.PatchOptions{}, "status")
443 framework.ExpectNoError(err)
444 if !patchedStatus.Status.LastScheduleTime.Equal(&now1) {
445 framework.Failf("patched object should have the applied lastScheduleTime %#v, got %#v instead", cjStatus.LastScheduleTime, patchedStatus.Status.LastScheduleTime)
446 }
447 gomega.Expect(patchedStatus.Annotations).To(gomega.HaveKeyWithValue("patchedstatus", "true"), "patched object should have the applied annotation")
448
449 ginkgo.By("updating /status")
450
451 now2 := metav1.Now().Rfc3339Copy()
452 var statusToUpdate, updatedStatus *batchv1.CronJob
453 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
454 statusToUpdate, err = cjClient.Get(ctx, createdCronJob.Name, metav1.GetOptions{})
455 if err != nil {
456 return err
457 }
458 statusToUpdate.Status.LastScheduleTime = &now2
459 updatedStatus, err = cjClient.UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{})
460 return err
461 })
462 framework.ExpectNoError(err)
463
464 if !updatedStatus.Status.LastScheduleTime.Equal(&now2) {
465 framework.Failf("updated object status expected to have updated lastScheduleTime %#v, got %#v", statusToUpdate.Status.LastScheduleTime, updatedStatus.Status.LastScheduleTime)
466 }
467
468 ginkgo.By("get /status")
469 cjResource := schema.GroupVersionResource{Group: "batch", Version: cjVersion, Resource: "cronjobs"}
470 gottenStatus, err := f.DynamicClient.Resource(cjResource).Namespace(ns).Get(ctx, createdCronJob.Name, metav1.GetOptions{}, "status")
471 framework.ExpectNoError(err)
472 statusUID, _, err := unstructured.NestedFieldCopy(gottenStatus.Object, "metadata", "uid")
473 framework.ExpectNoError(err)
474 gomega.Expect(string(createdCronJob.UID)).To(gomega.Equal(statusUID), "createdCronJob.UID: %v expected to match statusUID: %v ", createdCronJob.UID, statusUID)
475
476
477 expectFinalizer := func(cj *batchv1.CronJob, msg string) {
478 gomega.Expect(cj.DeletionTimestamp).NotTo(gomega.BeNil(), fmt.Sprintf("expected deletionTimestamp, got nil on step: %q, cronjob: %+v", msg, cj))
479 gomega.Expect(cj.Finalizers).ToNot(gomega.BeEmpty(), "expected finalizers on cronjob, got none on step: %q, cronjob: %+v", msg, cj)
480 }
481
482 ginkgo.By("deleting")
483 cjTemplate.Name = "for-removal"
484 forRemovalCronJob, err := cjClient.Create(ctx, cjTemplate, metav1.CreateOptions{})
485 framework.ExpectNoError(err)
486 err = cjClient.Delete(ctx, forRemovalCronJob.Name, metav1.DeleteOptions{})
487 framework.ExpectNoError(err)
488 cj, err := cjClient.Get(ctx, forRemovalCronJob.Name, metav1.GetOptions{})
489
490 if err == nil {
491 expectFinalizer(cj, "deleting cronjob")
492 } else if !apierrors.IsNotFound(err) {
493 framework.Failf("expected 404, got %v", err)
494 }
495
496 ginkgo.By("deleting a collection")
497 err = cjClient.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
498 framework.ExpectNoError(err)
499 cjs, err = cjClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
500 framework.ExpectNoError(err)
501
502 gomega.Expect(len(cjs.Items)).To(gomega.BeNumerically("<=", 2), "filtered list length should be <= 2, got:\n%s", format.Object(cjs.Items, 1))
503
504 for _, cj := range cjs.Items {
505 expectFinalizer(&cj, "deleting cronjob collection")
506 }
507 })
508
509 })
510
511 func ensureHistoryLimits(ctx context.Context, c clientset.Interface, ns string, cronJob *batchv1.CronJob) {
512 cronJob, err := createCronJob(ctx, c, ns, cronJob)
513 framework.ExpectNoError(err, "Failed to create allowconcurrent cronjob with custom history limits in namespace %s", ns)
514
515
516
517
518 ginkgo.By("Ensuring a finished job exists")
519 err = waitForAnyFinishedJob(ctx, c, ns)
520 framework.ExpectNoError(err, "Failed to ensure a finished cronjob exists in namespace %s", ns)
521
522 ginkgo.By("Ensuring a finished job exists by listing jobs explicitly")
523 jobs, err := c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
524 framework.ExpectNoError(err, "Failed to ensure a finished cronjob exists by listing jobs explicitly in namespace %s", ns)
525 activeJobs, finishedJobs := filterActiveJobs(jobs)
526 if len(finishedJobs) != 1 {
527 framework.Logf("Expected one finished job in namespace %s; activeJobs=%v; finishedJobs=%v", ns, activeJobs, finishedJobs)
528 gomega.Expect(finishedJobs).To(gomega.HaveLen(1))
529 }
530
531
532 ginkgo.By("Ensuring this job and its pods does not exist anymore")
533 err = waitForJobToDisappear(ctx, c, ns, finishedJobs[0])
534 framework.ExpectNoError(err, "Failed to ensure that job does not exists anymore in namespace %s", ns)
535 err = waitForJobsPodToDisappear(ctx, c, ns, finishedJobs[0])
536 framework.ExpectNoError(err, "Failed to ensure that pods for job does not exists anymore in namespace %s", ns)
537
538 ginkgo.By("Ensuring there is 1 finished job by listing jobs explicitly")
539 jobs, err = c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
540 framework.ExpectNoError(err, "Failed to ensure there is one finished job by listing job explicitly in namespace %s", ns)
541 activeJobs, finishedJobs = filterActiveJobs(jobs)
542 if len(finishedJobs) != 1 {
543 framework.Logf("Expected one finished job in namespace %s; activeJobs=%v; finishedJobs=%v", ns, activeJobs, finishedJobs)
544 gomega.Expect(finishedJobs).To(gomega.HaveLen(1))
545 }
546
547 ginkgo.By("Removing cronjob")
548 err = deleteCronJob(ctx, c, ns, cronJob.Name)
549 framework.ExpectNoError(err, "Failed to remove the %s cronjob in namespace %s", cronJob.Name, ns)
550 }
551
552
553 func newTestCronJob(name, schedule string, concurrencyPolicy batchv1.ConcurrencyPolicy,
554 command []string, successfulJobsHistoryLimit *int32, failedJobsHistoryLimit *int32) *batchv1.CronJob {
555 parallelism := int32(1)
556 completions := int32(1)
557 backofflimit := int32(1)
558 sj := &batchv1.CronJob{
559 ObjectMeta: metav1.ObjectMeta{
560 Name: name,
561 },
562 TypeMeta: metav1.TypeMeta{
563 Kind: "CronJob",
564 },
565 Spec: batchv1.CronJobSpec{
566 Schedule: schedule,
567 ConcurrencyPolicy: concurrencyPolicy,
568 JobTemplate: batchv1.JobTemplateSpec{
569 Spec: batchv1.JobSpec{
570 Parallelism: ¶llelism,
571 Completions: &completions,
572 BackoffLimit: &backofflimit,
573 Template: v1.PodTemplateSpec{
574 Spec: v1.PodSpec{
575 RestartPolicy: v1.RestartPolicyOnFailure,
576 Volumes: []v1.Volume{
577 {
578 Name: "data",
579 VolumeSource: v1.VolumeSource{
580 EmptyDir: &v1.EmptyDirVolumeSource{},
581 },
582 },
583 },
584 Containers: []v1.Container{
585 {
586 Name: "c",
587 Image: imageutils.GetE2EImage(imageutils.BusyBox),
588 VolumeMounts: []v1.VolumeMount{
589 {
590 MountPath: "/data",
591 Name: "data",
592 },
593 },
594 },
595 },
596 },
597 },
598 },
599 },
600 },
601 }
602 sj.Spec.SuccessfulJobsHistoryLimit = successfulJobsHistoryLimit
603 sj.Spec.FailedJobsHistoryLimit = failedJobsHistoryLimit
604 if command != nil {
605 sj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = command
606 }
607 return sj
608 }
609
610 func createCronJob(ctx context.Context, c clientset.Interface, ns string, cronJob *batchv1.CronJob) (*batchv1.CronJob, error) {
611 return c.BatchV1().CronJobs(ns).Create(ctx, cronJob, metav1.CreateOptions{})
612 }
613
614 func getCronJob(ctx context.Context, c clientset.Interface, ns, name string) (*batchv1.CronJob, error) {
615 return c.BatchV1().CronJobs(ns).Get(ctx, name, metav1.GetOptions{})
616 }
617
618 func deleteCronJob(ctx context.Context, c clientset.Interface, ns, name string) error {
619 propagationPolicy := metav1.DeletePropagationBackground
620 return c.BatchV1().CronJobs(ns).Delete(ctx, name, metav1.DeleteOptions{PropagationPolicy: &propagationPolicy})
621 }
622
623
624 func waitForActiveJobs(ctx context.Context, c clientset.Interface, ns, cronJobName string, active int) error {
625 return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
626 curr, err := getCronJob(ctx, c, ns, cronJobName)
627 if err != nil {
628 return false, err
629 }
630 return len(curr.Status.Active) >= active, nil
631 })
632 }
633
634
635 func waitForJobNotActive(ctx context.Context, c clientset.Interface, ns, cronJobName, jobName string) error {
636 return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
637 curr, err := getCronJob(ctx, c, ns, cronJobName)
638 if err != nil {
639 return false, err
640 }
641
642 for _, j := range curr.Status.Active {
643 if j.Name == jobName {
644 return false, nil
645 }
646 }
647 return true, nil
648 })
649 }
650
651
652 func waitForJobToDisappear(ctx context.Context, c clientset.Interface, ns string, targetJob *batchv1.Job) error {
653 return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
654 jobs, err := c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
655 if err != nil {
656 return false, err
657 }
658 _, finishedJobs := filterActiveJobs(jobs)
659 for _, job := range finishedJobs {
660 if targetJob.Namespace == job.Namespace && targetJob.Name == job.Name {
661 return false, nil
662 }
663 }
664 return true, nil
665 })
666 }
667
668
669 func waitForJobsPodToDisappear(ctx context.Context, c clientset.Interface, ns string, targetJob *batchv1.Job) error {
670 return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
671 options := metav1.ListOptions{LabelSelector: fmt.Sprintf("controller-uid=%s", targetJob.UID)}
672 pods, err := c.CoreV1().Pods(ns).List(ctx, options)
673 if err != nil {
674 return false, err
675 }
676 return len(pods.Items) == 0, nil
677 })
678 }
679
680
681 func waitForJobReplaced(ctx context.Context, c clientset.Interface, ns, previousJobName string) error {
682 return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
683 jobs, err := c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
684 if err != nil {
685 return false, err
686 }
687
688 aliveJobs := filterNotDeletedJobs(jobs)
689 if len(aliveJobs) > 1 {
690 return false, fmt.Errorf("more than one job is running %+v", jobs.Items)
691 } else if len(aliveJobs) == 0 {
692 framework.Logf("Warning: Found 0 jobs in namespace %v", ns)
693 return false, nil
694 }
695 return aliveJobs[0].Name != previousJobName, nil
696 })
697 }
698
699
700 func waitForJobsAtLeast(ctx context.Context, c clientset.Interface, ns string, atLeast int) error {
701 return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
702 jobs, err := c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
703 if err != nil {
704 return false, err
705 }
706 return len(jobs.Items) >= atLeast, nil
707 })
708 }
709
710
711 func waitForAnyFinishedJob(ctx context.Context, c clientset.Interface, ns string) error {
712 return wait.PollWithContext(ctx, framework.Poll, cronJobTimeout, func(ctx context.Context) (bool, error) {
713 jobs, err := c.BatchV1().Jobs(ns).List(ctx, metav1.ListOptions{})
714 if err != nil {
715 return false, err
716 }
717 for i := range jobs.Items {
718 if job.IsJobFinished(&jobs.Items[i]) {
719 return true, nil
720 }
721 }
722 return false, nil
723 })
724 }
725
726
727 func waitForEventWithReason(ctx context.Context, c clientset.Interface, ns, cronJobName string, reasons []string) error {
728 return wait.PollWithContext(ctx, framework.Poll, 30*time.Second, func(ctx context.Context) (bool, error) {
729 sj, err := getCronJob(ctx, c, ns, cronJobName)
730 if err != nil {
731 return false, err
732 }
733 events, err := c.CoreV1().Events(ns).Search(scheme.Scheme, sj)
734 if err != nil {
735 return false, err
736 }
737 for _, e := range events.Items {
738 for _, reason := range reasons {
739 if e.Reason == reason {
740 return true, nil
741 }
742 }
743 }
744 return false, nil
745 })
746 }
747
748
749
750 func filterNotDeletedJobs(jobs *batchv1.JobList) []*batchv1.Job {
751 var alive []*batchv1.Job
752 for i := range jobs.Items {
753 job := &jobs.Items[i]
754 if job.DeletionTimestamp == nil {
755 alive = append(alive, job)
756 }
757 }
758 return alive
759 }
760
761 func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job, finished []*batchv1.Job) {
762 for i := range jobs.Items {
763 j := jobs.Items[i]
764 if !job.IsJobFinished(&j) {
765 active = append(active, &j)
766 } else {
767 finished = append(finished, &j)
768 }
769 }
770 return
771 }
772
View as plain text