...

Source file src/k8s.io/kubernetes/test/e2e/framework/pod/pod_client.go

Documentation: k8s.io/kubernetes/test/e2e/framework/pod

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package pod
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"regexp"
    24  	"sync"
    25  	"time"
    26  
    27  	v1 "k8s.io/api/core/v1"
    28  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	"k8s.io/apimachinery/pkg/util/sets"
    32  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	"k8s.io/client-go/kubernetes/scheme"
    35  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    36  	"k8s.io/kubectl/pkg/util/podutils"
    37  
    38  	"github.com/onsi/ginkgo/v2"
    39  	"github.com/onsi/gomega"
    40  
    41  	"k8s.io/kubernetes/test/e2e/framework"
    42  )
    43  
    44  const (
    45  	// DefaultPodDeletionTimeout is the default timeout for deleting pod
    46  	DefaultPodDeletionTimeout = 3 * time.Minute
    47  
    48  	// the status of container event, copied from k8s.io/kubernetes/pkg/kubelet/events
    49  	killingContainer = "Killing"
    50  
    51  	// the status of container event, copied from k8s.io/kubernetes/pkg/kubelet/events
    52  	failedToCreateContainer = "Failed"
    53  
    54  	// the status of container event, copied from k8s.io/kubernetes/pkg/kubelet/events
    55  	startedContainer = "Started"
    56  
    57  	// it is copied from k8s.io/kubernetes/pkg/kubelet/sysctl
    58  	forbiddenReason = "SysctlForbidden"
    59  )
    60  
    61  // ImagePrePullList is the images used in the current test suite. It should be initialized in test suite and
    62  // the images in the list should be pre-pulled in the test suite.  Currently, this is only used by
    63  // node e2e test.
    64  var ImagePrePullList sets.String
    65  
    66  // NewPodClient is a convenience method for getting a pod client interface in the framework's namespace,
    67  // possibly applying test-suite specific transformations to the pod spec, e.g. for
    68  // node e2e pod scheduling.
    69  func NewPodClient(f *framework.Framework) *PodClient {
    70  	return &PodClient{
    71  		f:            f,
    72  		PodInterface: f.ClientSet.CoreV1().Pods(f.Namespace.Name),
    73  		namespace:    f.Namespace.Name,
    74  	}
    75  }
    76  
    77  // PodClientNS is a convenience method for getting a pod client interface in an alternative namespace,
    78  // possibly applying test-suite specific transformations to the pod spec, e.g. for
    79  // node e2e pod scheduling.
    80  func PodClientNS(f *framework.Framework, namespace string) *PodClient {
    81  	return &PodClient{
    82  		f:            f,
    83  		PodInterface: f.ClientSet.CoreV1().Pods(namespace),
    84  		namespace:    namespace,
    85  	}
    86  }
    87  
    88  // PodClient is a struct for pod client.
    89  type PodClient struct {
    90  	f *framework.Framework
    91  	v1core.PodInterface
    92  	namespace string
    93  }
    94  
    95  // Create creates a new pod according to the framework specifications (don't wait for it to start).
    96  func (c *PodClient) Create(ctx context.Context, pod *v1.Pod) *v1.Pod {
    97  	c.mungeSpec(pod)
    98  	p, err := c.PodInterface.Create(ctx, pod, metav1.CreateOptions{})
    99  	framework.ExpectNoError(err, "Error creating Pod")
   100  	return p
   101  }
   102  
   103  // CreateSync creates a new pod according to the framework specifications, and wait for it to start and be running and ready.
   104  func (c *PodClient) CreateSync(ctx context.Context, pod *v1.Pod) *v1.Pod {
   105  	p := c.Create(ctx, pod)
   106  	framework.ExpectNoError(WaitTimeoutForPodReadyInNamespace(ctx, c.f.ClientSet, p.Name, c.namespace, framework.PodStartTimeout))
   107  	// Get the newest pod after it becomes running and ready, some status may change after pod created, such as pod ip.
   108  	p, err := c.Get(ctx, p.Name, metav1.GetOptions{})
   109  	framework.ExpectNoError(err)
   110  	return p
   111  }
   112  
   113  // CreateBatch create a batch of pods. All pods are created before waiting.
   114  func (c *PodClient) CreateBatch(ctx context.Context, pods []*v1.Pod) []*v1.Pod {
   115  	ps := make([]*v1.Pod, len(pods))
   116  	var wg sync.WaitGroup
   117  	for i, pod := range pods {
   118  		wg.Add(1)
   119  		go func(i int, pod *v1.Pod) {
   120  			defer wg.Done()
   121  			defer ginkgo.GinkgoRecover()
   122  			ps[i] = c.CreateSync(ctx, pod)
   123  		}(i, pod)
   124  	}
   125  	wg.Wait()
   126  	return ps
   127  }
   128  
   129  // Update updates the pod object. It retries if there is a conflict, throw out error if
   130  // there is any other apierrors. name is the pod name, updateFn is the function updating the
   131  // pod object.
   132  func (c *PodClient) Update(ctx context.Context, name string, updateFn func(pod *v1.Pod)) {
   133  	framework.ExpectNoError(wait.PollWithContext(ctx, time.Millisecond*500, time.Second*30, func(ctx context.Context) (bool, error) {
   134  		pod, err := c.PodInterface.Get(ctx, name, metav1.GetOptions{})
   135  		if err != nil {
   136  			return false, fmt.Errorf("failed to get pod %q: %w", name, err)
   137  		}
   138  		updateFn(pod)
   139  		_, err = c.PodInterface.Update(ctx, pod, metav1.UpdateOptions{})
   140  		if err == nil {
   141  			framework.Logf("Successfully updated pod %q", name)
   142  			return true, nil
   143  		}
   144  		if apierrors.IsConflict(err) {
   145  			framework.Logf("Conflicting update to pod %q, re-get and re-update: %v", name, err)
   146  			return false, nil
   147  		}
   148  		return false, fmt.Errorf("failed to update pod %q: %w", name, err)
   149  	}))
   150  }
   151  
   152  // AddEphemeralContainerSync adds an EphemeralContainer to a pod and waits for it to be running.
   153  func (c *PodClient) AddEphemeralContainerSync(ctx context.Context, pod *v1.Pod, ec *v1.EphemeralContainer, timeout time.Duration) error {
   154  	podJS, err := json.Marshal(pod)
   155  	framework.ExpectNoError(err, "error creating JSON for pod %q", FormatPod(pod))
   156  
   157  	ecPod := pod.DeepCopy()
   158  	ecPod.Spec.EphemeralContainers = append(ecPod.Spec.EphemeralContainers, *ec)
   159  	ecJS, err := json.Marshal(ecPod)
   160  	framework.ExpectNoError(err, "error creating JSON for pod with ephemeral container %q", FormatPod(pod))
   161  
   162  	patch, err := strategicpatch.CreateTwoWayMergePatch(podJS, ecJS, pod)
   163  	framework.ExpectNoError(err, "error creating patch to add ephemeral container %q", FormatPod(pod))
   164  
   165  	// Clients may optimistically attempt to add an ephemeral container to determine whether the EphemeralContainers feature is enabled.
   166  	if _, err := c.Patch(ctx, pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "ephemeralcontainers"); err != nil {
   167  		return err
   168  	}
   169  
   170  	framework.ExpectNoError(WaitForContainerRunning(ctx, c.f.ClientSet, c.namespace, pod.Name, ec.Name, timeout))
   171  	return nil
   172  }
   173  
   174  // FormatPod returns a string representing a pod in a consistent human readable format,
   175  // with pod name, namespace and pod UID as part of the string.
   176  // This code is taken from k/k/pkg/kubelet/util/format/pod.go to remove
   177  // e2e framework -> k/k/pkg/kubelet dependency.
   178  func FormatPod(pod *v1.Pod) string {
   179  	if pod == nil {
   180  		return "<nil>"
   181  	}
   182  	return fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.UID)
   183  }
   184  
   185  // DeleteSync deletes the pod and wait for the pod to disappear for `timeout`. If the pod doesn't
   186  // disappear before the timeout, it will fail the test.
   187  func (c *PodClient) DeleteSync(ctx context.Context, name string, options metav1.DeleteOptions, timeout time.Duration) {
   188  	err := c.Delete(ctx, name, options)
   189  	if err != nil && !apierrors.IsNotFound(err) {
   190  		framework.Failf("Failed to delete pod %q: %v", name, err)
   191  	}
   192  	framework.ExpectNoError(WaitForPodNotFoundInNamespace(ctx, c.f.ClientSet, name, c.namespace, timeout), "wait for pod %q to disappear", name)
   193  }
   194  
   195  // mungeSpec apply test-suite specific transformations to the pod spec.
   196  func (c *PodClient) mungeSpec(pod *v1.Pod) {
   197  	if !framework.TestContext.NodeE2E {
   198  		return
   199  	}
   200  
   201  	gomega.Expect(pod.Spec.NodeName).To(gomega.Or(gomega.BeZero(), gomega.Equal(framework.TestContext.NodeName)), "Test misconfigured")
   202  	pod.Spec.NodeName = framework.TestContext.NodeName
   203  	// Node e2e does not support the default DNSClusterFirst policy. Set
   204  	// the policy to DNSDefault, which is configured per node.
   205  	pod.Spec.DNSPolicy = v1.DNSDefault
   206  
   207  	// PrepullImages only works for node e2e now. For cluster e2e, image prepull is not enforced,
   208  	// we should not munge ImagePullPolicy for cluster e2e pods.
   209  	if !framework.TestContext.PrepullImages {
   210  		return
   211  	}
   212  	// If prepull is enabled, munge the container spec to make sure the images are not pulled
   213  	// during the test.
   214  	for i := range pod.Spec.Containers {
   215  		c := &pod.Spec.Containers[i]
   216  		if c.ImagePullPolicy == v1.PullAlways {
   217  			// If the image pull policy is PullAlways, the image doesn't need to be in
   218  			// the allow list or pre-pulled, because the image is expected to be pulled
   219  			// in the test anyway.
   220  			continue
   221  		}
   222  		// If the image policy is not PullAlways, the image must be in the pre-pull list and
   223  		// pre-pulled.
   224  		gomega.Expect(ImagePrePullList.Has(c.Image)).To(gomega.BeTrue(), "Image %q is not in the pre-pull list, consider adding it to PrePulledImages in test/e2e/common/util.go or NodePrePullImageList in test/e2e_node/image_list.go", c.Image)
   225  		// Do not pull images during the tests because the images in pre-pull list should have
   226  		// been prepulled.
   227  		c.ImagePullPolicy = v1.PullNever
   228  	}
   229  }
   230  
   231  // WaitForSuccess waits for pod to succeed.
   232  // TODO(random-liu): Move pod wait function into this file
   233  func (c *PodClient) WaitForSuccess(ctx context.Context, name string, timeout time.Duration) {
   234  	gomega.Expect(WaitForPodCondition(ctx, c.f.ClientSet, c.namespace, name, fmt.Sprintf("%s or %s", v1.PodSucceeded, v1.PodFailed), timeout,
   235  		func(pod *v1.Pod) (bool, error) {
   236  			switch pod.Status.Phase {
   237  			case v1.PodFailed:
   238  				return true, fmt.Errorf("pod %q failed with reason: %q, message: %q", name, pod.Status.Reason, pod.Status.Message)
   239  			case v1.PodSucceeded:
   240  				return true, nil
   241  			default:
   242  				return false, nil
   243  			}
   244  		},
   245  	)).To(gomega.Succeed(), "wait for pod %q to succeed", name)
   246  }
   247  
   248  // WaitForFinish waits for pod to finish running, regardless of success or failure.
   249  func (c *PodClient) WaitForFinish(ctx context.Context, name string, timeout time.Duration) {
   250  	gomega.Expect(WaitForPodCondition(ctx, c.f.ClientSet, c.namespace, name, fmt.Sprintf("%s or %s", v1.PodSucceeded, v1.PodFailed), timeout,
   251  		func(pod *v1.Pod) (bool, error) {
   252  			switch pod.Status.Phase {
   253  			case v1.PodFailed:
   254  				return true, nil
   255  			case v1.PodSucceeded:
   256  				return true, nil
   257  			default:
   258  				return false, nil
   259  			}
   260  		},
   261  	)).To(gomega.Succeed(), "wait for pod %q to finish running", name)
   262  }
   263  
   264  // WaitForErrorEventOrSuccess waits for pod to succeed or an error event for that pod.
   265  func (c *PodClient) WaitForErrorEventOrSuccess(ctx context.Context, pod *v1.Pod) (*v1.Event, error) {
   266  	var ev *v1.Event
   267  	err := wait.PollWithContext(ctx, framework.Poll, framework.PodStartTimeout, func(ctx context.Context) (bool, error) {
   268  		evnts, err := c.f.ClientSet.CoreV1().Events(pod.Namespace).Search(scheme.Scheme, pod)
   269  		if err != nil {
   270  			return false, fmt.Errorf("error in listing events: %w", err)
   271  		}
   272  		for _, e := range evnts.Items {
   273  			switch e.Reason {
   274  			case killingContainer, failedToCreateContainer, forbiddenReason:
   275  				ev = &e
   276  				return true, nil
   277  			case startedContainer:
   278  				return true, nil
   279  			default:
   280  				// ignore all other errors
   281  			}
   282  		}
   283  		return false, nil
   284  	})
   285  	return ev, err
   286  }
   287  
   288  // MatchContainerOutput gets output of a container and match expected regexp in the output.
   289  func (c *PodClient) MatchContainerOutput(ctx context.Context, name string, containerName string, expectedRegexp string) error {
   290  	f := c.f
   291  	output, err := GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, name, containerName)
   292  	if err != nil {
   293  		return fmt.Errorf("failed to get output for container %q of pod %q", containerName, name)
   294  	}
   295  	regex, err := regexp.Compile(expectedRegexp)
   296  	if err != nil {
   297  		return fmt.Errorf("failed to compile regexp %q: %w", expectedRegexp, err)
   298  	}
   299  	if !regex.MatchString(output) {
   300  		return fmt.Errorf("failed to match regexp %q in output %q", expectedRegexp, output)
   301  	}
   302  	return nil
   303  }
   304  
   305  // PodIsReady returns true if the specified pod is ready. Otherwise false.
   306  func (c *PodClient) PodIsReady(ctx context.Context, name string) bool {
   307  	pod, err := c.Get(ctx, name, metav1.GetOptions{})
   308  	framework.ExpectNoError(err)
   309  	return podutils.IsPodReady(pod)
   310  }
   311  
   312  // RemoveString returns a newly created []string that contains all items from slice
   313  // that are not equal to s.
   314  // This code is taken from k/k/pkg/util/slice/slice.go to remove
   315  // e2e/framework/pod -> k/k/pkg/util/slice dependency.
   316  func removeString(slice []string, s string) []string {
   317  	newSlice := make([]string, 0)
   318  	for _, item := range slice {
   319  		if item != s {
   320  			newSlice = append(newSlice, item)
   321  		}
   322  	}
   323  	if len(newSlice) == 0 {
   324  		// Sanitize for unit tests so we don't need to distinguish empty array
   325  		// and nil.
   326  		return nil
   327  	}
   328  	return newSlice
   329  }
   330  
   331  // RemoveFinalizer removes the pod's finalizer
   332  func (c *PodClient) RemoveFinalizer(ctx context.Context, podName string, finalizerName string) {
   333  	framework.Logf("Removing pod's %q finalizer: %q", podName, finalizerName)
   334  	c.Update(ctx, podName, func(pod *v1.Pod) {
   335  		pod.ObjectMeta.Finalizers = removeString(pod.ObjectMeta.Finalizers, finalizerName)
   336  	})
   337  }
   338  

View as plain text