/* Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package e2enode import ( "context" "crypto/tls" "encoding/json" "flag" "fmt" "io" "net" "net/http" "os" "os/exec" "regexp" "strconv" "strings" "time" "k8s.io/kubernetes/pkg/util/procfs" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" oteltrace "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" "k8s.io/component-base/featuregate" internalapi "k8s.io/cri-api/pkg/apis" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" kubeletpodresourcesv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1" stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" "k8s.io/kubelet/pkg/types" "k8s.io/kubernetes/pkg/cluster/ports" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cri/remote" kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/util" "github.com/coreos/go-systemd/v22/dbus" "k8s.io/kubernetes/test/e2e/framework" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2enodekubelet "k8s.io/kubernetes/test/e2e_node/kubeletconfig" imageutils "k8s.io/kubernetes/test/utils/image" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" ) var startServices = flag.Bool("start-services", true, "If true, start local node services") var stopServices = flag.Bool("stop-services", true, "If true, stop local node services after running tests") var busyboxImage = imageutils.GetE2EImage(imageutils.BusyBox) var agnhostImage = imageutils.GetE2EImage(imageutils.Agnhost) const ( // Kubelet internal cgroup name for node allocatable cgroup. defaultNodeAllocatableCgroup = "kubepods" // defaultPodResourcesPath is the path to the local endpoint serving the podresources GRPC service. defaultPodResourcesPath = "/var/lib/kubelet/pod-resources" defaultPodResourcesTimeout = 10 * time.Second defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb // state files cpuManagerStateFile = "/var/lib/kubelet/cpu_manager_state" memoryManagerStateFile = "/var/lib/kubelet/memory_manager_state" ) var ( kubeletHealthCheckURL = fmt.Sprintf("http://127.0.0.1:%d/healthz", ports.KubeletHealthzPort) containerRuntimeUnitName = "" // KubeletConfig is the kubelet configuration the test is running against. kubeletCfg *kubeletconfig.KubeletConfiguration ) func getNodeSummary(ctx context.Context) (*stats.Summary, error) { kubeletConfig, err := getCurrentKubeletConfig(ctx) if err != nil { return nil, fmt.Errorf("failed to get current kubelet config") } req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s/stats/summary", net.JoinHostPort(kubeletConfig.Address, strconv.Itoa(int(kubeletConfig.ReadOnlyPort)))), nil) if err != nil { return nil, fmt.Errorf("failed to build http request: %w", err) } req.Header.Add("Accept", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { return nil, fmt.Errorf("failed to get /stats/summary: %w", err) } defer resp.Body.Close() contentsBytes, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read /stats/summary: %+v", resp) } decoder := json.NewDecoder(strings.NewReader(string(contentsBytes))) summary := stats.Summary{} err = decoder.Decode(&summary) if err != nil { return nil, fmt.Errorf("failed to parse /stats/summary to go struct: %+v", resp) } return &summary, nil } func getV1alpha1NodeDevices(ctx context.Context) (*kubeletpodresourcesv1alpha1.ListPodResourcesResponse, error) { endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) if err != nil { return nil, fmt.Errorf("Error getting local endpoint: %w", err) } client, conn, err := podresources.GetV1alpha1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) if err != nil { return nil, fmt.Errorf("Error getting grpc client: %w", err) } defer conn.Close() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() resp, err := client.List(ctx, &kubeletpodresourcesv1alpha1.ListPodResourcesRequest{}) if err != nil { return nil, fmt.Errorf("%v.Get(_) = _, %v", client, err) } return resp, nil } func getV1NodeDevices(ctx context.Context) (*kubeletpodresourcesv1.ListPodResourcesResponse, error) { endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) if err != nil { return nil, fmt.Errorf("Error getting local endpoint: %w", err) } client, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) if err != nil { return nil, fmt.Errorf("Error getting gRPC client: %w", err) } defer conn.Close() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() resp, err := client.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) if err != nil { return nil, fmt.Errorf("%v.Get(_) = _, %v", client, err) } return resp, nil } // Returns the current KubeletConfiguration func getCurrentKubeletConfig(ctx context.Context) (*kubeletconfig.KubeletConfiguration, error) { // namespace only relevant if useProxy==true, so we don't bother return e2enodekubelet.GetCurrentKubeletConfig(ctx, framework.TestContext.NodeName, "", false, framework.TestContext.StandaloneMode) } func cleanupPods(f *framework.Framework) { ginkgo.AfterEach(func(ctx context.Context) { ginkgo.By("Deleting any Pods created by the test in namespace: " + f.Namespace.Name) l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) framework.ExpectNoError(err) for _, p := range l.Items { if p.Namespace != f.Namespace.Name { continue } framework.Logf("Deleting pod: %s", p.Name) e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute) } }) } // Must be called within a Context. Allows the function to modify the KubeletConfiguration during the BeforeEach of the context. // The change is reverted in the AfterEach of the context. // Returns true on success. func tempSetCurrentKubeletConfig(f *framework.Framework, updateFunction func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration)) { var oldCfg *kubeletconfig.KubeletConfiguration ginkgo.BeforeEach(func(ctx context.Context) { var err error oldCfg, err = getCurrentKubeletConfig(ctx) framework.ExpectNoError(err) newCfg := oldCfg.DeepCopy() updateFunction(ctx, newCfg) if apiequality.Semantic.DeepEqual(*newCfg, *oldCfg) { return } updateKubeletConfig(ctx, f, newCfg, true) }) ginkgo.AfterEach(func(ctx context.Context) { if oldCfg != nil { // Update the Kubelet configuration. updateKubeletConfig(ctx, f, oldCfg, true) } }) } func updateKubeletConfig(ctx context.Context, f *framework.Framework, kubeletConfig *kubeletconfig.KubeletConfiguration, deleteStateFiles bool) { // Update the Kubelet configuration. ginkgo.By("Stopping the kubelet") startKubelet := stopKubelet() // wait until the kubelet health check will fail gomega.Eventually(ctx, func() bool { return kubeletHealthCheck(kubeletHealthCheckURL) }, time.Minute, time.Second).Should(gomega.BeFalse()) // Delete CPU and memory manager state files to be sure it will not prevent the kubelet restart if deleteStateFiles { deleteStateFile(cpuManagerStateFile) deleteStateFile(memoryManagerStateFile) } framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(kubeletConfig)) ginkgo.By("Starting the kubelet") startKubelet() waitForKubeletToStart(ctx, f) } func waitForKubeletToStart(ctx context.Context, f *framework.Framework) { // wait until the kubelet health check will succeed gomega.Eventually(ctx, func() bool { return kubeletHealthCheck(kubeletHealthCheckURL) }, 2*time.Minute, 5*time.Second).Should(gomega.BeTrue()) // Wait for the Kubelet to be ready. gomega.Eventually(ctx, func(ctx context.Context) bool { nodes, err := e2enode.TotalReady(ctx, f.ClientSet) framework.ExpectNoError(err) return nodes == 1 }, time.Minute, time.Second).Should(gomega.BeTrue()) } func deleteStateFile(stateFileName string) { err := exec.Command("/bin/sh", "-c", fmt.Sprintf("rm -f %s", stateFileName)).Run() framework.ExpectNoError(err, "failed to delete the state file") } // listNamespaceEvents lists the events in the given namespace. func listNamespaceEvents(ctx context.Context, c clientset.Interface, ns string) error { ls, err := c.CoreV1().Events(ns).List(ctx, metav1.ListOptions{}) if err != nil { return err } for _, event := range ls.Items { klog.Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message) } return nil } func logPodEvents(ctx context.Context, f *framework.Framework) { framework.Logf("Summary of pod events during the test:") err := listNamespaceEvents(ctx, f.ClientSet, f.Namespace.Name) framework.ExpectNoError(err) } func logNodeEvents(ctx context.Context, f *framework.Framework) { framework.Logf("Summary of node events during the test:") err := listNamespaceEvents(ctx, f.ClientSet, "") framework.ExpectNoError(err) } func getLocalNode(ctx context.Context, f *framework.Framework) *v1.Node { nodeList, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet) framework.ExpectNoError(err) gomega.Expect(nodeList.Items).Should(gomega.HaveLen(1), "Unexpected number of node objects for node e2e. Expects only one node.") return &nodeList.Items[0] } // getLocalTestNode fetches the node object describing the local worker node set up by the e2e_node infra, alongside with its ready state. // getLocalTestNode is a variant of `getLocalNode` which reports but does not set any requirement about the node readiness state, letting // the caller decide. The check is intentionally done like `getLocalNode` does. // Note `getLocalNode` aborts (as in ginkgo.Expect) the test implicitly if the worker node is not ready. func getLocalTestNode(ctx context.Context, f *framework.Framework) (*v1.Node, bool) { node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, framework.TestContext.NodeName, metav1.GetOptions{}) framework.ExpectNoError(err) ready := e2enode.IsNodeReady(node) schedulable := e2enode.IsNodeSchedulable(node) framework.Logf("node %q ready=%v schedulable=%v", node.Name, ready, schedulable) return node, ready && schedulable } // logKubeletLatencyMetrics logs KubeletLatencyMetrics computed from the Prometheus // metrics exposed on the current node and identified by the metricNames. // The Kubelet subsystem prefix is automatically prepended to these metric names. func logKubeletLatencyMetrics(ctx context.Context, metricNames ...string) { metricSet := sets.NewString() for _, key := range metricNames { metricSet.Insert(kubeletmetrics.KubeletSubsystem + "_" + key) } metric, err := e2emetrics.GrabKubeletMetricsWithoutProxy(ctx, fmt.Sprintf("%s:%d", nodeNameOrIP(), ports.KubeletReadOnlyPort), "/metrics") if err != nil { framework.Logf("Error getting kubelet metrics: %v", err) } else { framework.Logf("Kubelet Metrics: %+v", e2emetrics.GetKubeletLatencyMetrics(metric, metricSet)) } } // getCRIClient connects CRI and returns CRI runtime service clients and image service client. func getCRIClient() (internalapi.RuntimeService, internalapi.ImageManagerService, error) { // connection timeout for CRI service connection const connectionTimeout = 2 * time.Minute runtimeEndpoint := framework.TestContext.ContainerRuntimeEndpoint r, err := remote.NewRemoteRuntimeService(runtimeEndpoint, connectionTimeout, oteltrace.NewNoopTracerProvider()) if err != nil { return nil, nil, err } imageManagerEndpoint := runtimeEndpoint if framework.TestContext.ImageServiceEndpoint != "" { //ImageServiceEndpoint is the same as ContainerRuntimeEndpoint if not //explicitly specified imageManagerEndpoint = framework.TestContext.ImageServiceEndpoint } i, err := remote.NewRemoteImageService(imageManagerEndpoint, connectionTimeout, oteltrace.NewNoopTracerProvider()) if err != nil { return nil, nil, err } return r, i, nil } // findKubeletServiceName searches the unit name among the services known to systemd. // if the `running` parameter is true, restricts the search among currently running services; // otherwise, also stopped, failed, exited (non-running in general) services are also considered. // TODO: Find a uniform way to deal with systemctl/initctl/service operations. #34494 func findKubeletServiceName(running bool) string { cmdLine := []string{ "systemctl", "list-units", "*kubelet*", } if running { cmdLine = append(cmdLine, "--state=running") } stdout, err := exec.Command("sudo", cmdLine...).CombinedOutput() framework.ExpectNoError(err) regex := regexp.MustCompile("(kubelet-\\w+)") matches := regex.FindStringSubmatch(string(stdout)) gomega.Expect(matches).ToNot(gomega.BeEmpty(), "Found more than one kubelet service running: %q", stdout) kubeletServiceName := matches[0] framework.Logf("Get running kubelet with systemctl: %v, %v", string(stdout), kubeletServiceName) return kubeletServiceName } func findContainerRuntimeServiceName() (string, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() conn, err := dbus.NewWithContext(ctx) framework.ExpectNoError(err, "Failed to setup dbus connection") defer conn.Close() runtimePids, err := getPidsForProcess(framework.TestContext.ContainerRuntimeProcessName, framework.TestContext.ContainerRuntimePidFile) framework.ExpectNoError(err, "failed to get list of container runtime pids") gomega.Expect(runtimePids).To(gomega.HaveLen(1), "Unexpected number of container runtime pids. Expected 1 but got %v", len(runtimePids)) containerRuntimePid := runtimePids[0] unitName, err := conn.GetUnitNameByPID(ctx, uint32(containerRuntimePid)) framework.ExpectNoError(err, "Failed to get container runtime unit name") return unitName, nil } type containerRuntimeUnitOp int const ( startContainerRuntimeUnitOp containerRuntimeUnitOp = iota stopContainerRuntimeUnitOp ) func performContainerRuntimeUnitOp(op containerRuntimeUnitOp) error { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel() conn, err := dbus.NewWithContext(ctx) framework.ExpectNoError(err, "Failed to setup dbus connection") defer conn.Close() if containerRuntimeUnitName == "" { containerRuntimeUnitName, err = findContainerRuntimeServiceName() framework.ExpectNoError(err, "Failed to find container runtime name") } reschan := make(chan string) switch op { case startContainerRuntimeUnitOp: _, err = conn.StartUnitContext(ctx, containerRuntimeUnitName, "replace", reschan) case stopContainerRuntimeUnitOp: _, err = conn.StopUnitContext(ctx, containerRuntimeUnitName, "replace", reschan) default: framework.Failf("Unexpected container runtime op: %v", op) } framework.ExpectNoError(err, "dbus connection error") job := <-reschan gomega.Expect(job).To(gomega.Equal("done"), "Expected job to complete with done") return nil } func stopContainerRuntime() error { return performContainerRuntimeUnitOp(stopContainerRuntimeUnitOp) } func startContainerRuntime() error { return performContainerRuntimeUnitOp(startContainerRuntimeUnitOp) } // restartKubelet restarts the current kubelet service. // the "current" kubelet service is the instance managed by the current e2e_node test run. // If `running` is true, restarts only if the current kubelet is actually running. In some cases, // the kubelet may have exited or can be stopped, typically because it was intentionally stopped // earlier during a test, or, sometimes, because it just crashed. // Warning: the "current" kubelet is poorly defined. The "current" kubelet is assumed to be the most // recent kubelet service unit, IOW there is not a unique ID we use to bind explicitly a kubelet // instance to a test run. func restartKubelet(running bool) { kubeletServiceName := findKubeletServiceName(running) // reset the kubelet service start-limit-hit stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput() framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %s", err, string(stdout)) stdout, err = exec.Command("sudo", "systemctl", "restart", kubeletServiceName).CombinedOutput() framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %s", err, string(stdout)) } // stopKubelet will kill the running kubelet, and returns a func that will restart the process again func stopKubelet() func() { kubeletServiceName := findKubeletServiceName(true) // reset the kubelet service start-limit-hit stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput() framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %s", err, string(stdout)) stdout, err = exec.Command("sudo", "systemctl", "kill", kubeletServiceName).CombinedOutput() framework.ExpectNoError(err, "Failed to stop kubelet with systemctl: %v, %s", err, string(stdout)) return func() { // we should restart service, otherwise the transient service start will fail stdout, err := exec.Command("sudo", "systemctl", "restart", kubeletServiceName).CombinedOutput() framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %v", err, stdout) } } // killKubelet sends a signal (SIGINT, SIGSTOP, SIGTERM...) to the running kubelet func killKubelet(sig string) { kubeletServiceName := findKubeletServiceName(true) // reset the kubelet service start-limit-hit stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput() framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %v", err, stdout) stdout, err = exec.Command("sudo", "systemctl", "kill", "-s", sig, kubeletServiceName).CombinedOutput() framework.ExpectNoError(err, "Failed to stop kubelet with systemctl: %v, %v", err, stdout) } func kubeletHealthCheck(url string) bool { insecureTransport := http.DefaultTransport.(*http.Transport).Clone() insecureTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} insecureHTTPClient := &http.Client{ Transport: insecureTransport, } req, err := http.NewRequest("HEAD", url, nil) if err != nil { return false } req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", framework.TestContext.BearerToken)) resp, err := insecureHTTPClient.Do(req) if err != nil { klog.Warningf("Health check on %q failed, error=%v", url, err) } else if resp.StatusCode != http.StatusOK { klog.Warningf("Health check on %q failed, status=%d", url, resp.StatusCode) } return err == nil && resp.StatusCode == http.StatusOK } func toCgroupFsName(cgroupName cm.CgroupName) string { if kubeletCfg.CgroupDriver == "systemd" { return cgroupName.ToSystemd() } return cgroupName.ToCgroupfs() } // reduceAllocatableMemoryUsageIfCgroupv1 uses memory.force_empty (https://lwn.net/Articles/432224/) // to make the kernel reclaim memory in the allocatable cgroup // the time to reduce pressure may be unbounded, but usually finishes within a second. // memory.force_empty is no supported in cgroupv2. func reduceAllocatableMemoryUsageIfCgroupv1() { if !IsCgroup2UnifiedMode() { cmd := fmt.Sprintf("echo 0 > /sys/fs/cgroup/memory/%s/memory.force_empty", toCgroupFsName(cm.NewCgroupName(cm.RootCgroupName, defaultNodeAllocatableCgroup))) _, err := exec.Command("sudo", "sh", "-c", cmd).CombinedOutput() framework.ExpectNoError(err) } } // Equivalent of featuregatetesting.SetFeatureGateDuringTest // which can't be used here because we're not in a Testing context. // This must be in a non-"_test" file to pass // make verify WHAT=test-featuregates func withFeatureGate(feature featuregate.Feature, desired bool) func() { current := utilfeature.DefaultFeatureGate.Enabled(feature) utilfeature.DefaultMutableFeatureGate.Set(fmt.Sprintf("%s=%v", string(feature), desired)) return func() { utilfeature.DefaultMutableFeatureGate.Set(fmt.Sprintf("%s=%v", string(feature), current)) } } // waitForAllContainerRemoval waits until all the containers on a given pod are really gone. // This is needed by the e2e tests which involve exclusive resource allocation (cpu, topology manager; podresources; etc.) // In these cases, we need to make sure the tests clean up after themselves to make sure each test runs in // a pristine environment. The only way known so far to do that is to introduce this wait. // Worth noting, however, that this makes the test runtime much bigger. func waitForAllContainerRemoval(ctx context.Context, podName, podNS string) { rs, _, err := getCRIClient() framework.ExpectNoError(err) gomega.Eventually(ctx, func(ctx context.Context) error { containers, err := rs.ListContainers(ctx, &runtimeapi.ContainerFilter{ LabelSelector: map[string]string{ types.KubernetesPodNameLabel: podName, types.KubernetesPodNamespaceLabel: podNS, }, }) if err != nil { return fmt.Errorf("got error waiting for all containers to be removed from CRI: %v", err) } if len(containers) > 0 { return fmt.Errorf("expected all containers to be removed from CRI but %v containers still remain. Containers: %+v", len(containers), containers) } return nil }, 2*time.Minute, 1*time.Second).Should(gomega.Succeed()) } func getPidsForProcess(name, pidFile string) ([]int, error) { if len(pidFile) > 0 { pid, err := getPidFromPidFile(pidFile) if err == nil { return []int{pid}, nil } // log the error and fall back to pidof runtime.HandleError(err) } return procfs.PidOf(name) } func getPidFromPidFile(pidFile string) (int, error) { file, err := os.Open(pidFile) if err != nil { return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err) } defer file.Close() data, err := io.ReadAll(file) if err != nil { return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err) } pid, err := strconv.Atoi(string(data)) if err != nil { return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err) } return pid, nil } // WaitForPodInitContainerRestartCount waits for the given Pod init container // to achieve at least a given restartCount // TODO: eventually look at moving to test/e2e/framework/pod func WaitForPodInitContainerRestartCount(ctx context.Context, c clientset.Interface, namespace, podName string, initContainerIndex int, desiredRestartCount int32, timeout time.Duration) error { conditionDesc := fmt.Sprintf("init container %d started", initContainerIndex) return e2epod.WaitForPodCondition(ctx, c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) { if initContainerIndex > len(pod.Status.InitContainerStatuses)-1 { return false, nil } containerStatus := pod.Status.InitContainerStatuses[initContainerIndex] return containerStatus.RestartCount >= desiredRestartCount, nil }) } // WaitForPodContainerRestartCount waits for the given Pod container to achieve at least a given restartCount // TODO: eventually look at moving to test/e2e/framework/pod func WaitForPodContainerRestartCount(ctx context.Context, c clientset.Interface, namespace, podName string, containerIndex int, desiredRestartCount int32, timeout time.Duration) error { conditionDesc := fmt.Sprintf("container %d started", containerIndex) return e2epod.WaitForPodCondition(ctx, c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) { if containerIndex > len(pod.Status.ContainerStatuses)-1 { return false, nil } containerStatus := pod.Status.ContainerStatuses[containerIndex] return containerStatus.RestartCount >= desiredRestartCount, nil }) } // WaitForPodInitContainerToFail waits for the given Pod init container to fail with the given reason, specifically due to // invalid container configuration. In this case, the container will remain in a waiting state with a specific // reason set, which should match the given reason. // TODO: eventually look at moving to test/e2e/framework/pod func WaitForPodInitContainerToFail(ctx context.Context, c clientset.Interface, namespace, podName string, containerIndex int, reason string, timeout time.Duration) error { conditionDesc := fmt.Sprintf("container %d failed with reason %s", containerIndex, reason) return e2epod.WaitForPodCondition(ctx, c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) { switch pod.Status.Phase { case v1.PodPending: if len(pod.Status.InitContainerStatuses) == 0 { return false, nil } containerStatus := pod.Status.InitContainerStatuses[containerIndex] if containerStatus.State.Waiting != nil && containerStatus.State.Waiting.Reason == reason { return true, nil } return false, nil case v1.PodFailed, v1.PodRunning, v1.PodSucceeded: return false, fmt.Errorf("pod was expected to be pending, but it is in the state: %s", pod.Status.Phase) } return false, nil }) } func nodeNameOrIP() string { return "localhost" }