...

Source file src/k8s.io/kubernetes/test/e2e/apps/job.go

Documentation: k8s.io/kubernetes/test/e2e/apps

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apps
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"strconv"
    24  	"time"
    25  
    26  	batchv1 "k8s.io/api/batch/v1"
    27  	v1 "k8s.io/api/core/v1"
    28  	policyv1 "k8s.io/api/policy/v1"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	"k8s.io/apimachinery/pkg/api/resource"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    33  	"k8s.io/apimachinery/pkg/labels"
    34  	"k8s.io/apimachinery/pkg/runtime/schema"
    35  	"k8s.io/apimachinery/pkg/types"
    36  	utilrand "k8s.io/apimachinery/pkg/util/rand"
    37  	"k8s.io/apimachinery/pkg/util/sets"
    38  	"k8s.io/apimachinery/pkg/util/wait"
    39  	"k8s.io/apimachinery/pkg/watch"
    40  	clientset "k8s.io/client-go/kubernetes"
    41  	"k8s.io/client-go/tools/cache"
    42  	watchtools "k8s.io/client-go/tools/watch"
    43  	"k8s.io/client-go/util/retry"
    44  	batchinternal "k8s.io/kubernetes/pkg/apis/batch"
    45  	"k8s.io/kubernetes/test/e2e/framework"
    46  	e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
    47  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    48  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    49  	e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
    50  	"k8s.io/kubernetes/test/e2e/scheduling"
    51  	admissionapi "k8s.io/pod-security-admission/api"
    52  	"k8s.io/utils/pointer"
    53  	"k8s.io/utils/ptr"
    54  
    55  	"github.com/onsi/ginkgo/v2"
    56  	"github.com/onsi/gomega"
    57  )
    58  
    59  type watchEventConfig struct {
    60  	framework           *framework.Framework
    61  	resourceVersion     string
    62  	w                   *cache.ListWatch
    63  	jobName             string
    64  	watchEvent          watch.EventType
    65  	extJob              *batchv1.Job
    66  	updatedMetadataType string
    67  	updatedKey          string
    68  	updatedValue        string
    69  }
    70  
    71  var _ = SIGDescribe("Job", func() {
    72  	f := framework.NewDefaultFramework("job")
    73  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    74  
    75  	// Simplest case: N pods succeed
    76  	ginkgo.It("should run a job to completion when tasks succeed", func(ctx context.Context) {
    77  		parallelism := int32(2)
    78  		completions := int32(4)
    79  		backoffLimit := int32(6) // default value
    80  
    81  		ginkgo.By("Creating a job")
    82  		job := e2ejob.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
    83  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
    84  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
    85  
    86  		ginkgo.By("Ensuring job reaches completions")
    87  		err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
    88  		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
    89  
    90  		ginkgo.By("Ensuring pods for job exist")
    91  		pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
    92  		framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
    93  		successes := int32(0)
    94  		for _, pod := range pods.Items {
    95  			if pod.Status.Phase == v1.PodSucceeded {
    96  				successes++
    97  			}
    98  		}
    99  		gomega.Expect(successes).To(gomega.Equal(completions), "expected %d successful job pods, but got  %d", completions, successes)
   100  	})
   101  
   102  	ginkgo.It("should allow to use the pod failure policy on exit code to fail the job early", func(ctx context.Context) {
   103  
   104  		// We fail the Job's pod only once to ensure the backoffLimit is not
   105  		// reached and thus the job is failed due to the pod failure policy
   106  		// with FailJob action.
   107  		// In order to ensure a Job's pod fails once before succeeding we force
   108  		// the Job's Pods to be scheduled to a single Node and use a hostPath
   109  		// volume to persist data across new Pods.
   110  		ginkgo.By("Looking for a node to schedule job pod")
   111  		node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
   112  		framework.ExpectNoError(err)
   113  
   114  		parallelism := int32(2)
   115  		completions := int32(4)
   116  		backoffLimit := int32(6) // default value
   117  		ginkgo.By("Creating a job")
   118  		job := e2ejob.NewTestJobOnNode("failOnce", "pod-failure-failjob", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
   119  		job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{
   120  			Rules: []batchv1.PodFailurePolicyRule{
   121  				{
   122  					Action: batchv1.PodFailurePolicyActionFailJob,
   123  					OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
   124  						Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
   125  						Values:   []int32{1},
   126  					},
   127  				},
   128  			},
   129  		}
   130  		job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   131  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   132  
   133  		ginkgo.By("Ensuring job fails")
   134  		err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
   135  		framework.ExpectNoError(err, "failed to ensure job failure in namespace: %s", f.Namespace.Name)
   136  	})
   137  
   138  	ginkgo.It("should allow to use the pod failure policy to not count the failure towards the backoffLimit", func(ctx context.Context) {
   139  
   140  		// We set the backoffLimit to 0 so that any pod failure would trigger
   141  		// job failure if not for the pod failure policy to ignore the failed
   142  		// pods from counting them towards the backoffLimit. Also, we fail the
   143  		// pod only once so that the job eventually succeeds.
   144  		// In order to ensure a Job's pod fails once before succeeding we force
   145  		// the Job's Pods to be scheduled to a single Node and use a hostPath
   146  		// volume to persist data across new Pods.
   147  		parallelism := int32(2)
   148  		completions := int32(4)
   149  		backoffLimit := int32(0)
   150  
   151  		ginkgo.By("Looking for a node to schedule job pod")
   152  		node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
   153  		framework.ExpectNoError(err)
   154  
   155  		ginkgo.By("Creating a job")
   156  		job := e2ejob.NewTestJobOnNode("failOnce", "pod-failure-ignore", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
   157  		job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{
   158  			Rules: []batchv1.PodFailurePolicyRule{
   159  				{
   160  					Action: batchv1.PodFailurePolicyActionIgnore,
   161  					OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
   162  						Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
   163  						Values:   []int32{1},
   164  					},
   165  				},
   166  			},
   167  		}
   168  		job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   169  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   170  
   171  		ginkgo.By("Ensuring job reaches completions")
   172  		err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
   173  		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
   174  	})
   175  
   176  	// This test is using an indexed job. The pod corresponding to the 0th index
   177  	// creates a marker file on the host and runs 'forever' until evicted. We use
   178  	// the non-0-indexed pods to determine if the marker file is already
   179  	// created by the 0th indexed pod - the non-0-indexed pods fail and restart
   180  	// until the marker file is created (their potential failures are ignored
   181  	// based on the exit code). Once the marker file is created the 0th indexed
   182  	// pod is evicted (DisruptionTarget condition is added in the process),
   183  	// after restart it runs to successful completion.
   184  	// Steps:
   185  	// 1. Select a node to run all Job's pods to ensure the host marker file is accessible by all pods
   186  	// 2. Create the indexed job
   187  	// 3. Await for all non-0-indexed pods to succeed to ensure the marker file is created by the 0-indexed pod
   188  	// 4. Make sure the 0-indexed pod is running
   189  	// 5. Evict the 0-indexed pod
   190  	// 6. Await for the job to successfully complete
   191  	ginkgo.DescribeTable("Using a pod failure policy to not count some failures towards the backoffLimit",
   192  		func(ctx context.Context, policy *batchv1.PodFailurePolicy) {
   193  			mode := batchv1.IndexedCompletion
   194  
   195  			// We set the backoffLimit to 0 so that any pod failure would trigger
   196  			// job failure if not for the pod failure policy to ignore the failed
   197  			// pods from counting them towards the backoffLimit.
   198  			parallelism := int32(2)
   199  			completions := int32(4)
   200  			backoffLimit := int32(0)
   201  
   202  			ginkgo.By("Looking for a node to schedule job pods")
   203  			node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
   204  			framework.ExpectNoError(err)
   205  
   206  			ginkgo.By("Creating a job")
   207  			job := e2ejob.NewTestJobOnNode("notTerminateOnce", "pod-disruption-failure-ignore", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
   208  			job.Spec.CompletionMode = &mode
   209  			job.Spec.PodFailurePolicy = policy
   210  			job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   211  			framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   212  
   213  			ginkgo.By("Awaiting for all non 0-indexed pods to succeed to ensure the marker file is created")
   214  			err = e2ejob.WaitForJobPodsSucceeded(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions-1)
   215  			framework.ExpectNoError(err, "failed to await for all non 0-indexed pods to succeed for job: %s/%s", job.Name, job.Namespace)
   216  
   217  			ginkgo.By("Awaiting for the 0-indexed pod to be running")
   218  			err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, 1)
   219  			framework.ExpectNoError(err, "failed to await for the 0-indexed pod to be running for the job: %s/%s", job.Name, job.Namespace)
   220  
   221  			pods, err := e2ejob.GetAllRunningJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   222  			framework.ExpectNoError(err, "failed to get running pods for the job: %s/%s", job.Name, job.Namespace)
   223  			gomega.Expect(pods).To(gomega.HaveLen(1), "Exactly one running pod is expected")
   224  			pod := pods[0]
   225  			ginkgo.By(fmt.Sprintf("Evicting the running pod: %s/%s", pod.Name, pod.Namespace))
   226  			evictTarget := &policyv1.Eviction{
   227  				ObjectMeta: metav1.ObjectMeta{
   228  					Name:      pod.Name,
   229  					Namespace: pod.Namespace,
   230  				},
   231  			}
   232  			f.ClientSet.CoreV1().Pods(pod.Namespace).EvictV1(context.TODO(), evictTarget)
   233  			framework.ExpectNoError(err, "failed to evict the pod: %s/%s", pod.Name, pod.Namespace)
   234  
   235  			ginkgo.By(fmt.Sprintf("Awaiting for the pod: %s/%s to be deleted", pod.Name, pod.Namespace))
   236  			err = e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)
   237  			framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace)
   238  
   239  			ginkgo.By("Ensuring job reaches completions")
   240  			err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
   241  			framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
   242  		},
   243  		ginkgo.Entry("Ignore DisruptionTarget condition", &batchv1.PodFailurePolicy{
   244  			Rules: []batchv1.PodFailurePolicyRule{
   245  				{
   246  					// Ignore failures of the non 0-indexed pods which fail until the marker file is created
   247  					Action: batchv1.PodFailurePolicyActionIgnore,
   248  					OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
   249  						Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
   250  						Values:   []int32{1},
   251  					},
   252  				},
   253  				{
   254  					// Ignore the pod failure caused by the eviction
   255  					Action: batchv1.PodFailurePolicyActionIgnore,
   256  					OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{
   257  						{
   258  							Type:   v1.DisruptionTarget,
   259  							Status: v1.ConditionTrue,
   260  						},
   261  					},
   262  				},
   263  			},
   264  		}),
   265  		ginkgo.Entry("Ignore exit code 137", &batchv1.PodFailurePolicy{
   266  			Rules: []batchv1.PodFailurePolicyRule{
   267  				{
   268  					// Ignore failures of the non 0-indexed pods which fail until the marker file is created
   269  					// And the 137 in the 0-indexed pod due to eviction.
   270  					Action: batchv1.PodFailurePolicyActionIgnore,
   271  					OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
   272  						Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
   273  						Values:   []int32{1, 137},
   274  					},
   275  				},
   276  			},
   277  		}),
   278  	)
   279  
   280  	ginkgo.It("should not create pods when created in suspend state", func(ctx context.Context) {
   281  		parallelism := int32(2)
   282  		completions := int32(4)
   283  		backoffLimit := int32(6) // default value
   284  
   285  		ginkgo.By("Creating a job with suspend=true")
   286  		job := e2ejob.NewTestJob("succeed", "suspend-true-to-false", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   287  		job.Spec.Suspend = pointer.BoolPtr(true)
   288  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   289  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   290  
   291  		ginkgo.By("Checking Job status to observe Suspended state")
   292  		err = e2ejob.WaitForJobSuspend(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   293  		framework.ExpectNoError(err, "failed to observe suspend state: %s", f.Namespace.Name)
   294  
   295  		ginkgo.By("Ensuring pods aren't created for job")
   296  		pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   297  		framework.ExpectNoError(err, "failed to list pod for a given job %s in namespace %s", job.Name, f.Namespace.Name)
   298  		gomega.Expect(pods.Items).To(gomega.BeEmpty())
   299  
   300  		ginkgo.By("Updating the job with suspend=false")
   301  		job, err = f.ClientSet.BatchV1().Jobs(f.Namespace.Name).Get(ctx, job.Name, metav1.GetOptions{})
   302  		framework.ExpectNoError(err, "failed to get job in namespace: %s", f.Namespace.Name)
   303  		job.Spec.Suspend = pointer.BoolPtr(false)
   304  		job, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   305  		framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name)
   306  
   307  		ginkgo.By("Waiting for job to complete")
   308  		err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
   309  		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
   310  	})
   311  
   312  	ginkgo.It("should delete pods when suspended", func(ctx context.Context) {
   313  		parallelism := int32(2)
   314  		completions := int32(4)
   315  		backoffLimit := int32(6) // default value
   316  
   317  		ginkgo.By("Creating a job with suspend=false")
   318  		job := e2ejob.NewTestJob("notTerminate", "suspend-false-to-true", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   319  		job.Spec.Suspend = pointer.Bool(false)
   320  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   321  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   322  
   323  		ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
   324  		err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, parallelism)
   325  		framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
   326  
   327  		ginkgo.By("Updating the job with suspend=true")
   328  		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
   329  			job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   330  			framework.ExpectNoError(err, "unable to get job %s in namespace %s", job.Name, f.Namespace.Name)
   331  			job.Spec.Suspend = pointer.Bool(true)
   332  			updatedJob, err := e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   333  			if err == nil {
   334  				job = updatedJob
   335  			}
   336  			return err
   337  		})
   338  		framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name)
   339  
   340  		ginkgo.By("Ensuring pods are deleted")
   341  		err = e2ejob.WaitForAllJobPodsGone(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   342  		framework.ExpectNoError(err, "failed to ensure pods are deleted after suspend=true")
   343  
   344  		ginkgo.By("Checking Job status to observe Suspended state")
   345  		job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   346  		framework.ExpectNoError(err, "failed to retrieve latest job object")
   347  		exists := false
   348  		for _, c := range job.Status.Conditions {
   349  			if c.Type == batchv1.JobSuspended {
   350  				exists = true
   351  				break
   352  			}
   353  		}
   354  		if !exists {
   355  			framework.Failf("Job was expected to be completed or failed")
   356  		}
   357  	})
   358  
   359  	ginkgo.It("should recreate pods only after they have failed if pod replacement policy is set to Failed", func(ctx context.Context) {
   360  		ginkgo.By("Creating a job")
   361  		job := e2ejob.NewTestJob("", "pod-recreate-failed", v1.RestartPolicyNever, 1, 1, nil, 1)
   362  		job.Spec.PodReplacementPolicy = ptr.To(batchv1.Failed)
   363  		job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", `_term(){
   364  	sleep 5
   365  	exit 143
   366  }
   367  trap _term SIGTERM
   368  while true; do
   369  	sleep 1
   370  done`}
   371  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   372  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   373  
   374  		err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, 1)
   375  		framework.ExpectNoError(err, "failed to wait for job pod to become running in namespace: %s", f.Namespace.Name)
   376  
   377  		ginkgo.By("Deleting job pod")
   378  		pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   379  		framework.ExpectNoError(err, "failed to get pod list for job %s in namespace: %s", job.Name, f.Namespace.Name)
   380  
   381  		framework.ExpectNoError(e2epod.DeletePodsWithGracePeriod(ctx, f.ClientSet, pods.Items, 30), "failed to delete pods in namespace: %s", f.Namespace.Name)
   382  
   383  		ginkgo.By("Ensuring pod does not get recreated while it is in terminating state")
   384  		err = e2ejob.WaitForJobState(ctx, f.ClientSet, f.Namespace.Name, job.Name, f.Timeouts.PodDelete, func(job *batchv1.Job) string {
   385  			if job.Status.Active == 0 && job.Status.Failed == 0 && *job.Status.Terminating == 1 {
   386  				return ""
   387  			} else {
   388  				return fmt.Sprintf(
   389  					"expected job to have 0 active pod, 0 failed pod and 1 terminating pods, but got %d active pods, %d failed pods and %d terminating pods",
   390  					job.Status.Active,
   391  					job.Status.Failed,
   392  					*job.Status.Terminating,
   393  				)
   394  			}
   395  		})
   396  		framework.ExpectNoError(err, "failed to ensure pod is not recreated while it is in terminating state")
   397  
   398  		ginkgo.By("Ensuring pod gets recreated after it has failed")
   399  		err = e2ejob.WaitForJobState(ctx, f.ClientSet, f.Namespace.Name, job.Name, f.Timeouts.PodDelete, func(job *batchv1.Job) string {
   400  			if job.Status.Active == 1 && job.Status.Failed == 1 && *job.Status.Terminating == 0 {
   401  				return ""
   402  			} else {
   403  				return fmt.Sprintf(
   404  					"expected job to have 1 active pods, 1 failed pods and 0 terminating pod, but got %d active pods, %d failed pods and %d terminating pods",
   405  					job.Status.Active,
   406  					job.Status.Failed,
   407  					*job.Status.Terminating,
   408  				)
   409  			}
   410  		})
   411  		framework.ExpectNoError(err, "failed to wait for pod to get recreated")
   412  	})
   413  
   414  	/*
   415  		  Release: v1.24
   416  			Testname: Ensure Pods of an Indexed Job get a unique index.
   417  			Description: Create an Indexed job. Job MUST complete successfully.
   418  			Ensure that created pods have completion index annotation and environment variable.
   419  	*/
   420  	framework.ConformanceIt("should create pods for an Indexed job with completion indexes and specified hostname", func(ctx context.Context) {
   421  		parallelism := int32(2)
   422  		completions := int32(4)
   423  		backoffLimit := int32(6) // default value
   424  
   425  		ginkgo.By("Creating Indexed job")
   426  		job := e2ejob.NewTestJob("succeed", "indexed-job", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   427  		mode := batchv1.IndexedCompletion
   428  		job.Spec.CompletionMode = &mode
   429  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   430  		framework.ExpectNoError(err, "failed to create indexed job in namespace %s", f.Namespace.Name)
   431  
   432  		ginkgo.By("Ensuring job reaches completions")
   433  		err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
   434  		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
   435  
   436  		ginkgo.By("Ensuring pods with index for job exist")
   437  		pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   438  		framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
   439  		succeededIndexes := sets.NewInt()
   440  		for _, pod := range pods.Items {
   441  			if pod.Status.Phase == v1.PodSucceeded && pod.Annotations != nil {
   442  				ix, err := strconv.Atoi(pod.Annotations[batchv1.JobCompletionIndexAnnotation])
   443  				framework.ExpectNoError(err, "failed obtaining completion index from pod in namespace: %s", f.Namespace.Name)
   444  				succeededIndexes.Insert(ix)
   445  				expectedName := fmt.Sprintf("%s-%d", job.Name, ix)
   446  				gomega.Expect(pod.Spec.Hostname).To(gomega.Equal(expectedName), "expected completed pod with hostname %s, but got %s", expectedName, pod.Spec.Hostname)
   447  			}
   448  		}
   449  		gotIndexes := succeededIndexes.List()
   450  		wantIndexes := []int{0, 1, 2, 3}
   451  		gomega.Expect(gotIndexes).To(gomega.Equal(wantIndexes), "expected completed indexes %s, but got %s", wantIndexes, gotIndexes)
   452  	})
   453  
   454  	/*
   455  		Testcase: Ensure that all indexes are executed for an indexed job with backoffLimitPerIndex despite some failing
   456  		Description: Create an indexed job and ensure that all indexes are either failed or succeeded, depending
   457  		on the end state of the corresponding pods. Pods with odd indexes fail, while the pods with even indexes
   458  		succeeded. Also, verify that the number of failed pods doubles the number of failing indexes, as the
   459  		backoffLimitPerIndex=1, allowing for one pod recreation before marking that indexed failed.
   460  	*/
   461  	ginkgo.It("should execute all indexes despite some failing when using backoffLimitPerIndex", func(ctx context.Context) {
   462  		parallelism := int32(2)
   463  		completions := int32(4)
   464  		backoffLimit := int32(6) // default value
   465  
   466  		ginkgo.By("Creating an indexed job with backoffLimit per index and failing pods")
   467  		job := e2ejob.NewTestJob("failOddSucceedEven", "with-backoff-limit-per-index", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   468  		job.Spec.BackoffLimit = nil
   469  		job.Spec.BackoffLimitPerIndex = ptr.To[int32](1)
   470  		mode := batchv1.IndexedCompletion
   471  		job.Spec.CompletionMode = &mode
   472  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   473  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   474  
   475  		ginkgo.By("Awaiting for the job to fail as there are failed indexes")
   476  		err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
   477  		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
   478  
   479  		ginkgo.By("Verifying the Job status fields to ensure all indexes were executed")
   480  		job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   481  		framework.ExpectNoError(err, "failed to retrieve latest job object")
   482  		gomega.Expect(job.Status.FailedIndexes).Should(gomega.HaveValue(gomega.Equal("1,3")))
   483  		gomega.Expect(job.Status.CompletedIndexes).Should(gomega.Equal("0,2"))
   484  		gomega.Expect(job.Status.Failed).Should(gomega.Equal(int32(4)))
   485  		gomega.Expect(job.Status.Succeeded).Should(gomega.Equal(int32(2)))
   486  	})
   487  
   488  	/*
   489  		Testcase: Terminate job execution when the maxFailedIndexes is exceeded
   490  		Description: Create an indexed job with backoffLimitPerIndex and maxFailedIndexes.
   491  		Verify the job execution is terminated as soon as the number of failed
   492  		indexes exceeds maxFailedIndexes.
   493  	*/
   494  	ginkgo.It("should terminate job execution when the number of failed indexes exceeds maxFailedIndexes", func(ctx context.Context) {
   495  		// we use parallelism=1 to make sure in the asserts only one pod was created
   496  		parallelism := int32(1)
   497  		completions := int32(4)
   498  		backoffLimit := int32(6) // default value
   499  
   500  		ginkgo.By("Creating an indexed job with backoffLimit per index and maxFailedIndexes")
   501  		job := e2ejob.NewTestJob("fail", "with-max-failed-indexes", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   502  		job.Spec.BackoffLimit = nil
   503  		job.Spec.BackoffLimitPerIndex = ptr.To[int32](0)
   504  		job.Spec.MaxFailedIndexes = ptr.To[int32](0)
   505  
   506  		mode := batchv1.IndexedCompletion
   507  		job.Spec.CompletionMode = &mode
   508  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   509  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   510  
   511  		ginkgo.By("Awaiting for the job to fail as the number of max failed indexes is exceeded")
   512  		err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
   513  		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
   514  
   515  		ginkgo.By("Verifying the Job status fields to ensure early termination of the job")
   516  		job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   517  		framework.ExpectNoError(err, "failed to retrieve latest job object")
   518  		gomega.Expect(job.Status.FailedIndexes).Should(gomega.HaveValue(gomega.Equal("0")))
   519  		gomega.Expect(job.Status.Failed).Should(gomega.Equal(int32(1)))
   520  	})
   521  
   522  	/*
   523  		Testcase: Mark indexes as failed when the FailIndex action is matched in podFailurePolicy
   524  		Description: Create an indexed job with backoffLimitPerIndex, and podFailurePolicy
   525  		with the FailIndex action. Verify the failed pods matching the pod failure policy
   526  		result in marking the corresponding indexes as failed without restarts, despite
   527  		backoffLimitPerIndex > 0.
   528  	*/
   529  	ginkgo.It("should mark indexes as failed when the FailIndex action is matched in podFailurePolicy", func(ctx context.Context) {
   530  		parallelism := int32(2)
   531  		completions := int32(2)
   532  		backoffLimit := int32(6) // default value
   533  
   534  		ginkgo.By("Creating an indexed job with failing pods matching the FailIndex action")
   535  		job := e2ejob.NewTestJob("failOddSucceedEven", "matching-fail-index-action", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   536  		job.Spec.BackoffLimit = nil
   537  		job.Spec.BackoffLimitPerIndex = ptr.To[int32](1)
   538  		job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{
   539  			Rules: []batchv1.PodFailurePolicyRule{
   540  				{
   541  					Action: batchv1.PodFailurePolicyActionFailIndex,
   542  					OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
   543  						Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
   544  						Values:   []int32{1},
   545  					},
   546  				},
   547  			},
   548  		}
   549  		mode := batchv1.IndexedCompletion
   550  		job.Spec.CompletionMode = &mode
   551  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   552  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   553  
   554  		ginkgo.By("Awaiting for the job to fail as all indexes are failed")
   555  		err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
   556  		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
   557  
   558  		ginkgo.By("Verifying the Job status fields to ensure the upper indexes didn't execute")
   559  		job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   560  		framework.ExpectNoError(err, "failed to retrieve latest job object")
   561  		gomega.Expect(job.Status.FailedIndexes).Should(gomega.HaveValue(gomega.Equal("1")))
   562  		gomega.Expect(job.Status.CompletedIndexes).Should(gomega.Equal("0"))
   563  		gomega.Expect(job.Status.Failed).Should(gomega.Equal(int32(1)))
   564  		gomega.Expect(job.Status.Succeeded).Should(gomega.Equal(int32(1)))
   565  	})
   566  
   567  	/*
   568  		Testcase: Ensure that the pods associated with the job are removed once the job is deleted
   569  		Description: Create a job and ensure the associated pod count is equal to parallelism count. Delete the
   570  		job and ensure if the pods associated with the job have been removed
   571  	*/
   572  	ginkgo.It("should remove pods when job is deleted", func(ctx context.Context) {
   573  		parallelism := int32(2)
   574  		completions := int32(4)
   575  		backoffLimit := int32(6) // default value
   576  
   577  		ginkgo.By("Creating a job")
   578  		job := e2ejob.NewTestJob("notTerminate", "all-pods-removed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   579  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   580  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   581  
   582  		ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
   583  		err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, parallelism)
   584  		framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
   585  
   586  		ginkgo.By("Delete the job")
   587  		err = e2eresource.DeleteResourceAndWaitForGC(ctx, f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name)
   588  		framework.ExpectNoError(err, "failed to delete the job in namespace: %s", f.Namespace.Name)
   589  
   590  		ginkgo.By("Ensure the pods associated with the job are also deleted")
   591  		err = e2ejob.WaitForAllJobPodsGone(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   592  		framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name)
   593  	})
   594  
   595  	/*
   596  		Release: v1.16
   597  		Testname: Jobs, completion after task failure
   598  		Description: Explicitly cause the tasks to fail once initially. After restarting, the Job MUST
   599  		execute to completion.
   600  	*/
   601  	framework.ConformanceIt("should run a job to completion when tasks sometimes fail and are locally restarted", func(ctx context.Context) {
   602  		parallelism := int32(2)
   603  		completions := int32(4)
   604  		backoffLimit := int32(6) // default value
   605  
   606  		ginkgo.By("Creating a job")
   607  		// One failure, then a success, local restarts.
   608  		// We can't use the random failure approach, because kubelet will
   609  		// throttle frequently failing containers in a given pod, ramping
   610  		// up to 5 minutes between restarts, making test timeout due to
   611  		// successive failures too likely with a reasonable test timeout.
   612  		job := e2ejob.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil, backoffLimit)
   613  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   614  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   615  
   616  		ginkgo.By("Ensuring job reaches completions")
   617  		err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
   618  		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
   619  	})
   620  
   621  	// Pods sometimes fail, but eventually succeed, after pod restarts
   622  	ginkgo.It("should run a job to completion when tasks sometimes fail and are not locally restarted", func(ctx context.Context) {
   623  		// One failure, then a success, no local restarts.
   624  		// We can't use the random failure approach, because JobController
   625  		// will throttle frequently failing Pods of a given Job, ramping
   626  		// up to 6 minutes between restarts, making test timeout due to
   627  		// successive failures.
   628  		// Instead, we force the Job's Pods to be scheduled to a single Node
   629  		// and use a hostPath volume to persist data across new Pods.
   630  		ginkgo.By("Looking for a node to schedule job pod")
   631  		node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
   632  		framework.ExpectNoError(err)
   633  
   634  		parallelism := int32(2)
   635  		completions := int32(4)
   636  		backoffLimit := int32(6) // default value
   637  
   638  		ginkgo.By("Creating a job")
   639  		job := e2ejob.NewTestJobOnNode("failOnce", "fail-once-non-local", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
   640  		job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   641  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   642  
   643  		ginkgo.By("Ensuring job reaches completions")
   644  		err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions)
   645  		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
   646  	})
   647  
   648  	ginkgo.It("should fail when exceeds active deadline", func(ctx context.Context) {
   649  		activeDeadlineSeconds := int64(1)
   650  		parallelism := int32(2)
   651  		completions := int32(4)
   652  		backoffLimit := int32(6) // default value
   653  
   654  		ginkgo.By("Creating a job")
   655  		job := e2ejob.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit)
   656  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   657  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   658  		ginkgo.By("Ensuring job past active deadline")
   659  		err = waitForJobFailure(ctx, f.ClientSet, f.Namespace.Name, job.Name, time.Duration(activeDeadlineSeconds+15)*time.Second, "DeadlineExceeded")
   660  		framework.ExpectNoError(err, "failed to ensure job past active deadline in namespace: %s", f.Namespace.Name)
   661  	})
   662  
   663  	/*
   664  		Release: v1.15
   665  		Testname: Jobs, active pods, graceful termination
   666  		Description: Create a job. Ensure the active pods reflect parallelism in the namespace and delete the job. Job MUST be deleted successfully.
   667  	*/
   668  	framework.ConformanceIt("should delete a job", func(ctx context.Context) {
   669  		parallelism := int32(2)
   670  		completions := int32(4)
   671  		backoffLimit := int32(6) // default value
   672  
   673  		ginkgo.By("Creating a job")
   674  		job := e2ejob.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   675  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   676  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   677  
   678  		ginkgo.By("Ensuring active pods == parallelism")
   679  		err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, parallelism)
   680  		framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
   681  
   682  		ginkgo.By("delete a job")
   683  		framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
   684  
   685  		ginkgo.By("Ensuring job was deleted")
   686  		_, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   687  		gomega.Expect(err).To(gomega.MatchError(apierrors.IsNotFound, fmt.Sprintf("failed to ensure job %s was deleted in namespace: %s", job.Name, f.Namespace.Name)))
   688  	})
   689  
   690  	/*
   691  		Release: v1.16
   692  		Testname: Jobs, orphan pods, re-adoption
   693  		Description: Create a parallel job. The number of Pods MUST equal the level of parallelism.
   694  		Orphan a Pod by modifying its owner reference. The Job MUST re-adopt the orphan pod.
   695  		Modify the labels of one of the Job's Pods. The Job MUST release the Pod.
   696  	*/
   697  	framework.ConformanceIt("should adopt matching orphans and release non-matching pods", func(ctx context.Context) {
   698  		parallelism := int32(2)
   699  		completions := int32(4)
   700  		backoffLimit := int32(6) // default value
   701  
   702  		ginkgo.By("Creating a job")
   703  		job := e2ejob.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   704  		// Replace job with the one returned from Create() so it has the UID.
   705  		// Save Kind since it won't be populated in the returned job.
   706  		kind := job.Kind
   707  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   708  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   709  		job.Kind = kind
   710  
   711  		ginkgo.By("Ensuring active pods == parallelism")
   712  		err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, parallelism)
   713  		framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
   714  
   715  		ginkgo.By("Orphaning one of the Job's Pods")
   716  		pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   717  		framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name)
   718  		gomega.Expect(pods.Items).To(gomega.HaveLen(int(parallelism)))
   719  		pod := pods.Items[0]
   720  		e2epod.NewPodClient(f).Update(ctx, pod.Name, func(pod *v1.Pod) {
   721  			pod.OwnerReferences = nil
   722  		})
   723  
   724  		ginkgo.By("Checking that the Job readopts the Pod")
   725  		gomega.Expect(e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "adopted", e2ejob.JobTimeout,
   726  			func(pod *v1.Pod) (bool, error) {
   727  				controllerRef := metav1.GetControllerOf(pod)
   728  				if controllerRef == nil {
   729  					return false, nil
   730  				}
   731  				if controllerRef.Kind != job.Kind || controllerRef.Name != job.Name || controllerRef.UID != job.UID {
   732  					return false, fmt.Errorf("pod has wrong controllerRef: got %v, want %v", controllerRef, job)
   733  				}
   734  				return true, nil
   735  			},
   736  		)).To(gomega.Succeed(), "wait for pod %q to be readopted", pod.Name)
   737  
   738  		ginkgo.By("Removing the labels from the Job's Pod")
   739  		e2epod.NewPodClient(f).Update(ctx, pod.Name, func(pod *v1.Pod) {
   740  			pod.Labels = nil
   741  		})
   742  
   743  		ginkgo.By("Checking that the Job releases the Pod")
   744  		gomega.Expect(e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "released", e2ejob.JobTimeout,
   745  			func(pod *v1.Pod) (bool, error) {
   746  				controllerRef := metav1.GetControllerOf(pod)
   747  				if controllerRef != nil {
   748  					return false, nil
   749  				}
   750  				return true, nil
   751  			},
   752  		)).To(gomega.Succeed(), "wait for pod %q to be released", pod.Name)
   753  	})
   754  
   755  	ginkgo.It("should fail to exceed backoffLimit", func(ctx context.Context) {
   756  		ginkgo.By("Creating a job")
   757  		backoff := 1
   758  		job := e2ejob.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, int32(backoff))
   759  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   760  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   761  		ginkgo.By("Ensuring job exceed backofflimit")
   762  
   763  		err = waitForJobFailure(ctx, f.ClientSet, f.Namespace.Name, job.Name, e2ejob.JobTimeout, "BackoffLimitExceeded")
   764  		framework.ExpectNoError(err, "failed to ensure job exceed backofflimit in namespace: %s", f.Namespace.Name)
   765  
   766  		ginkgo.By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1))
   767  		pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   768  		framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name)
   769  		gomega.Expect(pods.Items).To(gomega.HaveLen(backoff + 1))
   770  		for _, pod := range pods.Items {
   771  			gomega.Expect(pod.Status.Phase).To(gomega.Equal(v1.PodFailed))
   772  		}
   773  	})
   774  
   775  	f.It("should run a job to completion with CPU requests", f.WithSerial(), func(ctx context.Context) {
   776  		ginkgo.By("Creating a job that with CPU requests")
   777  
   778  		testNodeName := scheduling.GetNodeThatCanRunPod(ctx, f)
   779  		targetNode, err := f.ClientSet.CoreV1().Nodes().Get(ctx, testNodeName, metav1.GetOptions{})
   780  		framework.ExpectNoError(err, "unable to get node object for node %v", testNodeName)
   781  
   782  		cpu, ok := targetNode.Status.Allocatable[v1.ResourceCPU]
   783  		if !ok {
   784  			framework.Failf("Unable to get node's %q cpu", targetNode.Name)
   785  		}
   786  
   787  		cpuRequest := fmt.Sprint(int64(0.2 * float64(cpu.Value())))
   788  
   789  		parallelism := int32(90)
   790  		completions := int32(90)
   791  		backoffLimit := int32(0)
   792  
   793  		ginkgo.By("Creating a job")
   794  		job := e2ejob.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   795  		for i := range job.Spec.Template.Spec.Containers {
   796  			job.Spec.Template.Spec.Containers[i].Resources = v1.ResourceRequirements{
   797  				Requests: v1.ResourceList{
   798  					v1.ResourceCPU: resource.MustParse(cpuRequest),
   799  				},
   800  			}
   801  			job.Spec.Template.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": testNodeName}
   802  		}
   803  
   804  		framework.Logf("Creating job %q with a node hostname selector %q with cpu request %q", job.Name, testNodeName, cpuRequest)
   805  		job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   806  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   807  
   808  		ginkgo.By("Ensuring job reaches completions")
   809  		err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
   810  		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
   811  
   812  		ginkgo.By("Ensuring pods for job exist")
   813  		pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
   814  		framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
   815  		successes := int32(0)
   816  		for _, pod := range pods.Items {
   817  			if pod.Status.Phase == v1.PodSucceeded {
   818  				successes++
   819  			}
   820  		}
   821  		gomega.Expect(successes).To(gomega.Equal(completions), "expected %d successful job pods, but got  %d", completions, successes)
   822  	})
   823  
   824  	/*
   825  		Release: v1.24
   826  		Testname: Jobs, apply changes to status
   827  		Description: Attempt to create a running Job which MUST succeed.
   828  		Attempt to patch the Job status which MUST succeed.
   829  		An annotation for the job that was patched MUST be found.
   830  		Attempt to replace the job status with update which MUST succeed.
   831  		Attempt to read its status sub-resource which MUST succeed
   832  	*/
   833  	framework.ConformanceIt("should apply changes to a job status", func(ctx context.Context) {
   834  
   835  		ns := f.Namespace.Name
   836  		jClient := f.ClientSet.BatchV1().Jobs(ns)
   837  
   838  		parallelism := int32(2)
   839  		completions := int32(4)
   840  		backoffLimit := int32(6) // default value
   841  
   842  		ginkgo.By("Creating a job")
   843  		job := e2ejob.NewTestJob("notTerminate", "suspend-false-to-true", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   844  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
   845  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
   846  
   847  		ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
   848  		err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, parallelism)
   849  		framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
   850  
   851  		customConditionType := batchv1.JobConditionType("CustomConditionType")
   852  		// /status subresource operations
   853  		ginkgo.By("patching /status")
   854  		// we need to use RFC3339 version since conversion over the wire cuts nanoseconds
   855  		now1 := metav1.Now().Rfc3339Copy()
   856  		jStatus := batchv1.JobStatus{
   857  			Conditions: []batchv1.JobCondition{
   858  				{
   859  					Type:               customConditionType,
   860  					Status:             v1.ConditionTrue,
   861  					LastTransitionTime: now1,
   862  				},
   863  			},
   864  		}
   865  
   866  		jStatusJSON, err := json.Marshal(jStatus)
   867  		framework.ExpectNoError(err)
   868  		patchedStatus, err := jClient.Patch(ctx, job.Name, types.MergePatchType,
   869  			[]byte(`{"metadata":{"annotations":{"patchedstatus":"true"}},"status":`+string(jStatusJSON)+`}`),
   870  			metav1.PatchOptions{}, "status")
   871  		framework.ExpectNoError(err)
   872  		if condition := findConditionByType(patchedStatus.Status.Conditions, customConditionType); condition != nil {
   873  			if !condition.LastTransitionTime.Equal(&now1) {
   874  				framework.Failf("patched object should have the applied condition with LastTransitionTime %#v, got %#v instead", now1, condition.LastTransitionTime)
   875  			}
   876  		} else {
   877  			framework.Failf("patched object does not have the required condition %v", customConditionType)
   878  		}
   879  		gomega.Expect(patchedStatus.Annotations).To(gomega.HaveKeyWithValue("patchedstatus", "true"), "patched object should have the applied annotation")
   880  
   881  		ginkgo.By("updating /status")
   882  		// we need to use RFC3339 version since conversion over the wire cuts nanoseconds
   883  		now2 := metav1.Now().Rfc3339Copy()
   884  		var statusToUpdate, updatedStatus *batchv1.Job
   885  		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
   886  			statusToUpdate, err = jClient.Get(ctx, job.Name, metav1.GetOptions{})
   887  			if err != nil {
   888  				return err
   889  			}
   890  			if condition := findConditionByType(statusToUpdate.Status.Conditions, customConditionType); condition != nil {
   891  				condition.LastTransitionTime = now2
   892  			} else {
   893  				framework.Failf("patched object does not have the required condition %v", customConditionType)
   894  			}
   895  			updatedStatus, err = jClient.UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{})
   896  			return err
   897  		})
   898  		framework.ExpectNoError(err)
   899  		if condition := findConditionByType(updatedStatus.Status.Conditions, customConditionType); condition != nil {
   900  			if !condition.LastTransitionTime.Equal(&now2) {
   901  				framework.Failf("patched object should have the applied condition with LastTransitionTime %#v, got %#v instead", now2, condition.LastTransitionTime)
   902  			}
   903  		} else {
   904  			framework.Failf("patched object does not have the required condition %v", customConditionType)
   905  		}
   906  
   907  		ginkgo.By("get /status")
   908  		jResource := schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}
   909  		gottenStatus, err := f.DynamicClient.Resource(jResource).Namespace(ns).Get(ctx, job.Name, metav1.GetOptions{}, "status")
   910  		framework.ExpectNoError(err)
   911  		statusUID, _, err := unstructured.NestedFieldCopy(gottenStatus.Object, "metadata", "uid")
   912  		framework.ExpectNoError(err)
   913  		gomega.Expect(string(job.UID)).To(gomega.Equal(statusUID), fmt.Sprintf("job.UID: %v expected to match statusUID: %v ", job.UID, statusUID))
   914  	})
   915  
   916  	/*
   917  		Release: v1.25
   918  		Testname: Jobs, manage lifecycle
   919  		Description: Attempt to create a suspended Job which MUST succeed.
   920  		Attempt to patch the Job to include a new label which MUST succeed.
   921  		The label MUST be found. Attempt to replace the Job to include a
   922  		new annotation which MUST succeed. The annotation MUST be found.
   923  		Attempt to list all namespaces with a label selector which MUST
   924  		succeed. One list MUST be found. It MUST succeed at deleting a
   925  		collection of jobs via a label selector.
   926  	*/
   927  	framework.ConformanceIt("should manage the lifecycle of a job", func(ctx context.Context) {
   928  		jobName := "e2e-" + utilrand.String(5)
   929  		label := map[string]string{"e2e-job-label": jobName}
   930  		labelSelector := labels.SelectorFromSet(label).String()
   931  
   932  		ns := f.Namespace.Name
   933  		jobClient := f.ClientSet.BatchV1().Jobs(ns)
   934  
   935  		w := &cache.ListWatch{
   936  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   937  				options.LabelSelector = labelSelector
   938  				return jobClient.Watch(ctx, options)
   939  			},
   940  		}
   941  		jobsList, err := jobClient.List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
   942  		framework.ExpectNoError(err, "failed to list Job")
   943  
   944  		parallelism := int32(2)
   945  		completions := int32(4)
   946  		backoffLimit := int32(6) // default value
   947  
   948  		ginkgo.By("Creating a suspended job")
   949  		job := e2ejob.NewTestJob("succeed", jobName, v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
   950  		job.Labels = label
   951  		job.Spec.Suspend = pointer.BoolPtr(true)
   952  		job, err = e2ejob.CreateJob(ctx, f.ClientSet, ns, job)
   953  		framework.ExpectNoError(err, "failed to create job in namespace: %s", ns)
   954  
   955  		ginkgo.By("Patching the Job")
   956  		payload := "{\"metadata\":{\"labels\":{\"" + jobName + "\":\"patched\"}}}"
   957  		patchedJob, err := f.ClientSet.BatchV1().Jobs(ns).Patch(ctx, jobName, types.StrategicMergePatchType, []byte(payload), metav1.PatchOptions{})
   958  		framework.ExpectNoError(err, "failed to patch Job %s in namespace %s", jobName, ns)
   959  
   960  		ginkgo.By("Watching for Job to be patched")
   961  		c := watchEventConfig{
   962  			framework:           f,
   963  			resourceVersion:     jobsList.ResourceVersion,
   964  			w:                   w,
   965  			jobName:             jobName,
   966  			watchEvent:          watch.Modified,
   967  			extJob:              patchedJob,
   968  			updatedMetadataType: "label",
   969  			updatedKey:          jobName,
   970  			updatedValue:        "patched",
   971  		}
   972  		waitForJobEvent(ctx, c)
   973  		gomega.Expect(patchedJob.Labels).To(gomega.HaveKeyWithValue(jobName, "patched"), "Did not find job label for this job. Current labels: %v", patchedJob.Labels)
   974  
   975  		ginkgo.By("Updating the job")
   976  		var updatedJob *batchv1.Job
   977  
   978  		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
   979  			patchedJob, err = jobClient.Get(ctx, jobName, metav1.GetOptions{})
   980  			framework.ExpectNoError(err, "Unable to get job %s", jobName)
   981  			patchedJob.Spec.Suspend = pointer.BoolPtr(false)
   982  			if patchedJob.Annotations == nil {
   983  				patchedJob.Annotations = map[string]string{}
   984  			}
   985  			patchedJob.Annotations["updated"] = "true"
   986  			updatedJob, err = e2ejob.UpdateJob(ctx, f.ClientSet, ns, patchedJob)
   987  			return err
   988  		})
   989  		framework.ExpectNoError(err, "failed to update job in namespace: %s", ns)
   990  
   991  		ginkgo.By("Watching for Job to be updated")
   992  		c = watchEventConfig{
   993  			framework:           f,
   994  			resourceVersion:     patchedJob.ResourceVersion,
   995  			w:                   w,
   996  			jobName:             jobName,
   997  			watchEvent:          watch.Modified,
   998  			extJob:              updatedJob,
   999  			updatedMetadataType: "annotation",
  1000  			updatedKey:          "updated",
  1001  			updatedValue:        "true",
  1002  		}
  1003  		waitForJobEvent(ctx, c)
  1004  		gomega.Expect(updatedJob.Annotations).To(gomega.HaveKeyWithValue("updated", "true"), "updated Job should have the applied annotation")
  1005  		framework.Logf("Found Job annotations: %#v", patchedJob.Annotations)
  1006  
  1007  		ginkgo.By("Listing all Jobs with LabelSelector")
  1008  		jobs, err := f.ClientSet.BatchV1().Jobs("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
  1009  		framework.ExpectNoError(err, "Failed to list job. %v", err)
  1010  		gomega.Expect(jobs.Items).To(gomega.HaveLen(1), "Failed to find job %v", jobName)
  1011  		testJob := jobs.Items[0]
  1012  		framework.Logf("Job: %v as labels: %v", testJob.Name, testJob.Labels)
  1013  
  1014  		ginkgo.By("Waiting for job to complete")
  1015  		err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, ns, jobName, completions)
  1016  		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", ns)
  1017  
  1018  		ginkgo.By("Delete a job collection with a labelselector")
  1019  		propagationPolicy := metav1.DeletePropagationBackground
  1020  		err = f.ClientSet.BatchV1().Jobs(ns).DeleteCollection(ctx, metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}, metav1.ListOptions{LabelSelector: labelSelector})
  1021  		framework.ExpectNoError(err, "failed to delete job %s in namespace: %s", job.Name, ns)
  1022  
  1023  		ginkgo.By("Watching for Job to be deleted")
  1024  		c = watchEventConfig{
  1025  			framework:           f,
  1026  			resourceVersion:     updatedJob.ResourceVersion,
  1027  			w:                   w,
  1028  			jobName:             jobName,
  1029  			watchEvent:          watch.Deleted,
  1030  			extJob:              &testJob,
  1031  			updatedMetadataType: "label",
  1032  			updatedKey:          "e2e-job-label",
  1033  			updatedValue:        jobName,
  1034  		}
  1035  		waitForJobEvent(ctx, c)
  1036  
  1037  		ginkgo.By("Relist jobs to confirm deletion")
  1038  		jobs, err = f.ClientSet.BatchV1().Jobs("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
  1039  		framework.ExpectNoError(err, "Failed to list job. %v", err)
  1040  		gomega.Expect(jobs.Items).To(gomega.BeEmpty(), "Found job %v", jobName)
  1041  	})
  1042  
  1043  	ginkgo.It("should update the status ready field", func(ctx context.Context) {
  1044  		parallelism := int32(2)
  1045  		completions := int32(4)
  1046  		backoffLimit := int32(6) // default value
  1047  
  1048  		ginkgo.By("Creating a job with suspend=true")
  1049  		job := e2ejob.NewTestJob("notTerminate", "all-ready", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
  1050  		job.Spec.Suspend = ptr.To[bool](true)
  1051  		job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
  1052  		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
  1053  
  1054  		ginkgo.By("Ensure the job controller updates the status.ready field")
  1055  		err = e2ejob.WaitForJobReady(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To[int32](0))
  1056  		framework.ExpectNoError(err, "failed to ensure job status ready field in namespace: %s", f.Namespace.Name)
  1057  
  1058  		ginkgo.By("Updating the job with suspend=false")
  1059  		err = updateJobSuspendWithRetries(ctx, f, job, ptr.To[bool](false))
  1060  		framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name)
  1061  
  1062  		ginkgo.By("Ensure the job controller updates the status.ready field")
  1063  		err = e2ejob.WaitForJobReady(ctx, f.ClientSet, f.Namespace.Name, job.Name, &parallelism)
  1064  		framework.ExpectNoError(err, "failed to ensure job status ready field in namespace: %s", f.Namespace.Name)
  1065  
  1066  		ginkgo.By("Updating the job with suspend=true")
  1067  		err = updateJobSuspendWithRetries(ctx, f, job, ptr.To[bool](true))
  1068  		framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name)
  1069  
  1070  		ginkgo.By("Ensure the job controller updates the status.ready field")
  1071  		err = e2ejob.WaitForJobReady(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To[int32](0))
  1072  		framework.ExpectNoError(err, "failed to ensure job status ready field in namespace: %s", f.Namespace.Name)
  1073  	})
  1074  })
  1075  
  1076  func updateJobSuspendWithRetries(ctx context.Context, f *framework.Framework, job *batchv1.Job, suspend *bool) error {
  1077  	return retry.RetryOnConflict(retry.DefaultRetry, func() error {
  1078  		job, err := e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
  1079  		framework.ExpectNoError(err, "unable to get job %s in namespace %s", job.Name, f.Namespace.Name)
  1080  		job.Spec.Suspend = suspend
  1081  		_, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job)
  1082  		return err
  1083  	})
  1084  }
  1085  
  1086  // waitForJobEvent is used to track and log Job events.
  1087  // As delivery of events is not actually guaranteed we
  1088  // will not return an error if we miss the required event.
  1089  func waitForJobEvent(ctx context.Context, config watchEventConfig) {
  1090  	f := config.framework
  1091  	ctx, cancel := context.WithTimeout(ctx, f.Timeouts.PodStartShort)
  1092  	defer cancel()
  1093  	_, err := watchtools.Until(ctx, config.resourceVersion, config.w, func(event watch.Event) (bool, error) {
  1094  		if job, ok := event.Object.(*batchv1.Job); ok {
  1095  
  1096  			var key string
  1097  			switch config.updatedMetadataType {
  1098  			case "annotation":
  1099  				key = job.Annotations[config.updatedKey]
  1100  			case "label":
  1101  				key = job.Labels[config.updatedKey]
  1102  			}
  1103  
  1104  			found := job.ObjectMeta.Name == config.extJob.ObjectMeta.Name &&
  1105  				job.ObjectMeta.Namespace == f.Namespace.Name &&
  1106  				key == config.updatedValue &&
  1107  				event.Type == config.watchEvent
  1108  			if !found {
  1109  				framework.Logf("Event %v observed for Job %v in namespace %v with labels: %v and annotations: %v", event.Type, job.ObjectMeta.Name, job.ObjectMeta.Namespace, job.Labels, job.Annotations)
  1110  				return false, nil
  1111  			}
  1112  			framework.Logf("Event %v found for Job %v in namespace %v with labels: %v and annotations: %v", event.Type, job.ObjectMeta.Name, job.ObjectMeta.Namespace, job.Labels, job.Annotations)
  1113  			return found, nil
  1114  		}
  1115  		framework.Logf("Observed event: %+v", event.Object)
  1116  		return false, nil
  1117  	})
  1118  	if err != nil {
  1119  		j, _ := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).Get(ctx, config.jobName, metav1.GetOptions{})
  1120  		framework.Logf("We missed the %v event. Job details: %+v", config.watchEvent, j)
  1121  	}
  1122  }
  1123  
  1124  // waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail.
  1125  func waitForJobFailure(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error {
  1126  	return wait.Poll(framework.Poll, timeout, func() (bool, error) {
  1127  		curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
  1128  		if err != nil {
  1129  			return false, err
  1130  		}
  1131  		for _, c := range curr.Status.Conditions {
  1132  			if c.Type == batchv1.JobFailed && c.Status == v1.ConditionTrue {
  1133  				if reason == "" || reason == c.Reason {
  1134  					return true, nil
  1135  				}
  1136  			}
  1137  		}
  1138  		return false, nil
  1139  	})
  1140  }
  1141  
  1142  func findConditionByType(list []batchv1.JobCondition, cType batchv1.JobConditionType) *batchv1.JobCondition {
  1143  	for i := range list {
  1144  		if list[i].Type == cType {
  1145  			return &list[i]
  1146  		}
  1147  	}
  1148  	return nil
  1149  }
  1150  

View as plain text