1
16
17 package cronjob
18
19 import (
20 "context"
21 "fmt"
22 "testing"
23 "time"
24
25 batchv1 "k8s.io/api/batch/v1"
26 corev1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/util/wait"
29 "k8s.io/client-go/informers"
30 clientset "k8s.io/client-go/kubernetes"
31 clientbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
32 restclient "k8s.io/client-go/rest"
33 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
34 "k8s.io/kubernetes/pkg/controller/cronjob"
35 "k8s.io/kubernetes/pkg/controller/job"
36 "k8s.io/kubernetes/test/integration/framework"
37 "k8s.io/kubernetes/test/utils/ktesting"
38 )
39
40 func setup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *cronjob.ControllerV2, *job.Controller, informers.SharedInformerFactory, clientset.Interface) {
41
42 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
43
44 config := restclient.CopyConfig(server.ClientConfig)
45 clientSet, err := clientset.NewForConfig(config)
46 if err != nil {
47 t.Fatalf("Error creating clientset: %v", err)
48 }
49 resyncPeriod := 12 * time.Hour
50 informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "cronjob-informers")), resyncPeriod)
51 cjc, err := cronjob.NewControllerV2(ctx, informerSet.Batch().V1().Jobs(), informerSet.Batch().V1().CronJobs(), clientSet)
52 if err != nil {
53 t.Fatalf("Error creating CronJob controller: %v", err)
54 }
55 jc, err := job.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
56 if err != nil {
57 t.Fatalf("Error creating Job controller: %v", err)
58 }
59
60 return server.TearDownFn, cjc, jc, informerSet, clientSet
61 }
62
63 func newCronJob(name, namespace, schedule string) *batchv1.CronJob {
64 zero64 := int64(0)
65 zero32 := int32(0)
66 return &batchv1.CronJob{
67 TypeMeta: metav1.TypeMeta{
68 Kind: "CronJob",
69 APIVersion: "batch/v1",
70 },
71 ObjectMeta: metav1.ObjectMeta{
72 Namespace: namespace,
73 Name: name,
74 },
75 Spec: batchv1.CronJobSpec{
76 Schedule: schedule,
77 SuccessfulJobsHistoryLimit: &zero32,
78 JobTemplate: batchv1.JobTemplateSpec{
79 Spec: batchv1.JobSpec{
80 Template: corev1.PodTemplateSpec{
81 Spec: corev1.PodSpec{
82 Containers: []corev1.Container{{Name: "foo", Image: "bar"}},
83 TerminationGracePeriodSeconds: &zero64,
84 RestartPolicy: "Never",
85 },
86 },
87 },
88 },
89 },
90 }
91 }
92
93 func cleanupCronJobs(t *testing.T, cjClient clientbatchv1.CronJobInterface, name string) {
94 deletePropagation := metav1.DeletePropagationForeground
95 err := cjClient.Delete(context.TODO(), name, metav1.DeleteOptions{PropagationPolicy: &deletePropagation})
96 if err != nil {
97 t.Errorf("Failed to delete CronJob: %v", err)
98 }
99 }
100
101 func validateJobAndPod(t *testing.T, clientSet clientset.Interface, namespace string) {
102 if err := wait.PollImmediate(1*time.Second, 120*time.Second, func() (bool, error) {
103 jobs, err := clientSet.BatchV1().Jobs(namespace).List(context.TODO(), metav1.ListOptions{})
104 if err != nil {
105 t.Fatalf("Failed to list jobs: %v", err)
106 }
107
108 if len(jobs.Items) == 0 {
109 return false, nil
110 }
111
112 for _, j := range jobs.Items {
113 ownerReferences := j.ObjectMeta.OwnerReferences
114 if refCount := len(ownerReferences); refCount != 1 {
115 return false, fmt.Errorf("job %s has %d OwnerReferences, expected only 1", j.Name, refCount)
116 }
117
118 reference := ownerReferences[0]
119 if reference.Kind != "CronJob" {
120 return false, fmt.Errorf("job %s has OwnerReference with Kind %s, expected CronJob", j.Name, reference.Kind)
121 }
122 }
123
124 pods, err := clientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
125 if err != nil {
126 t.Fatalf("Failed to list pods: %v", err)
127 }
128
129 if len(pods.Items) != 1 {
130 return false, nil
131 }
132
133 for _, pod := range pods.Items {
134 ownerReferences := pod.ObjectMeta.OwnerReferences
135 if refCount := len(ownerReferences); refCount != 1 {
136 return false, fmt.Errorf("pod %s has %d OwnerReferences, expected only 1", pod.Name, refCount)
137 }
138
139 reference := ownerReferences[0]
140 if reference.Kind != "Job" {
141 return false, fmt.Errorf("pod %s has OwnerReference with Kind %s, expected Job", pod.Name, reference.Kind)
142 }
143 }
144 return true, nil
145 }); err != nil {
146 t.Fatal(err)
147 }
148 }
149
150 func TestCronJobLaunchesPodAndCleansUp(t *testing.T) {
151 tCtx := ktesting.Init(t)
152
153 closeFn, cjc, jc, informerSet, clientSet := setup(tCtx, t)
154 defer closeFn()
155
156
157 defer tCtx.Cancel("test has completed")
158
159 cronJobName := "foo"
160 namespaceName := "simple-cronjob-test"
161
162 ns := framework.CreateNamespaceOrDie(clientSet, namespaceName, t)
163 defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
164
165 cjClient := clientSet.BatchV1().CronJobs(ns.Name)
166
167 informerSet.Start(tCtx.Done())
168 go cjc.Run(tCtx, 1)
169 go jc.Run(tCtx, 1)
170
171 _, err := cjClient.Create(tCtx, newCronJob(cronJobName, ns.Name, "* * * * ?"), metav1.CreateOptions{})
172 if err != nil {
173 t.Fatalf("Failed to create CronJob: %v", err)
174 }
175 defer cleanupCronJobs(t, cjClient, cronJobName)
176
177 validateJobAndPod(t, clientSet, namespaceName)
178 }
179
View as plain text