1
16
17 package e2enode
18
19 import (
20 "context"
21 "crypto/tls"
22 "encoding/json"
23 "flag"
24 "fmt"
25 "io"
26 "net"
27 "net/http"
28 "os"
29 "os/exec"
30 "regexp"
31 "strconv"
32 "strings"
33 "time"
34
35 "k8s.io/kubernetes/pkg/util/procfs"
36 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
37
38 oteltrace "go.opentelemetry.io/otel/trace"
39
40 v1 "k8s.io/api/core/v1"
41 apiequality "k8s.io/apimachinery/pkg/api/equality"
42 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
43 "k8s.io/apimachinery/pkg/util/runtime"
44 "k8s.io/apimachinery/pkg/util/sets"
45 utilfeature "k8s.io/apiserver/pkg/util/feature"
46 clientset "k8s.io/client-go/kubernetes"
47 "k8s.io/component-base/featuregate"
48 internalapi "k8s.io/cri-api/pkg/apis"
49 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
50 "k8s.io/klog/v2"
51 kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
52 kubeletpodresourcesv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
53 stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
54 "k8s.io/kubelet/pkg/types"
55 "k8s.io/kubernetes/pkg/cluster/ports"
56 kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
57 "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
58 "k8s.io/kubernetes/pkg/kubelet/cm"
59 "k8s.io/kubernetes/pkg/kubelet/cri/remote"
60 kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
61 "k8s.io/kubernetes/pkg/kubelet/util"
62
63 "github.com/coreos/go-systemd/v22/dbus"
64 "k8s.io/kubernetes/test/e2e/framework"
65 e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
66 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
67 e2enodekubelet "k8s.io/kubernetes/test/e2e_node/kubeletconfig"
68 imageutils "k8s.io/kubernetes/test/utils/image"
69
70 "github.com/onsi/ginkgo/v2"
71 "github.com/onsi/gomega"
72 )
73
74 var startServices = flag.Bool("start-services", true, "If true, start local node services")
75 var stopServices = flag.Bool("stop-services", true, "If true, stop local node services after running tests")
76 var busyboxImage = imageutils.GetE2EImage(imageutils.BusyBox)
77 var agnhostImage = imageutils.GetE2EImage(imageutils.Agnhost)
78
79 const (
80
81 defaultNodeAllocatableCgroup = "kubepods"
82
83 defaultPodResourcesPath = "/var/lib/kubelet/pod-resources"
84 defaultPodResourcesTimeout = 10 * time.Second
85 defaultPodResourcesMaxSize = 1024 * 1024 * 16
86
87 cpuManagerStateFile = "/var/lib/kubelet/cpu_manager_state"
88 memoryManagerStateFile = "/var/lib/kubelet/memory_manager_state"
89 )
90
91 var (
92 kubeletHealthCheckURL = fmt.Sprintf("http://127.0.0.1:%d/healthz", ports.KubeletHealthzPort)
93 containerRuntimeUnitName = ""
94
95 kubeletCfg *kubeletconfig.KubeletConfiguration
96 )
97
98 func getNodeSummary(ctx context.Context) (*stats.Summary, error) {
99 kubeletConfig, err := getCurrentKubeletConfig(ctx)
100 if err != nil {
101 return nil, fmt.Errorf("failed to get current kubelet config")
102 }
103 req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s/stats/summary", net.JoinHostPort(kubeletConfig.Address, strconv.Itoa(int(kubeletConfig.ReadOnlyPort)))), nil)
104 if err != nil {
105 return nil, fmt.Errorf("failed to build http request: %w", err)
106 }
107 req.Header.Add("Accept", "application/json")
108
109 client := &http.Client{}
110 resp, err := client.Do(req)
111 if err != nil {
112 return nil, fmt.Errorf("failed to get /stats/summary: %w", err)
113 }
114
115 defer resp.Body.Close()
116 contentsBytes, err := io.ReadAll(resp.Body)
117 if err != nil {
118 return nil, fmt.Errorf("failed to read /stats/summary: %+v", resp)
119 }
120
121 decoder := json.NewDecoder(strings.NewReader(string(contentsBytes)))
122 summary := stats.Summary{}
123 err = decoder.Decode(&summary)
124 if err != nil {
125 return nil, fmt.Errorf("failed to parse /stats/summary to go struct: %+v", resp)
126 }
127 return &summary, nil
128 }
129
130 func getV1alpha1NodeDevices(ctx context.Context) (*kubeletpodresourcesv1alpha1.ListPodResourcesResponse, error) {
131 endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
132 if err != nil {
133 return nil, fmt.Errorf("Error getting local endpoint: %w", err)
134 }
135 client, conn, err := podresources.GetV1alpha1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
136 if err != nil {
137 return nil, fmt.Errorf("Error getting grpc client: %w", err)
138 }
139 defer conn.Close()
140 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
141 defer cancel()
142 resp, err := client.List(ctx, &kubeletpodresourcesv1alpha1.ListPodResourcesRequest{})
143 if err != nil {
144 return nil, fmt.Errorf("%v.Get(_) = _, %v", client, err)
145 }
146 return resp, nil
147 }
148
149 func getV1NodeDevices(ctx context.Context) (*kubeletpodresourcesv1.ListPodResourcesResponse, error) {
150 endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
151 if err != nil {
152 return nil, fmt.Errorf("Error getting local endpoint: %w", err)
153 }
154 client, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
155 if err != nil {
156 return nil, fmt.Errorf("Error getting gRPC client: %w", err)
157 }
158 defer conn.Close()
159 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
160 defer cancel()
161 resp, err := client.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{})
162 if err != nil {
163 return nil, fmt.Errorf("%v.Get(_) = _, %v", client, err)
164 }
165 return resp, nil
166 }
167
168
169 func getCurrentKubeletConfig(ctx context.Context) (*kubeletconfig.KubeletConfiguration, error) {
170
171 return e2enodekubelet.GetCurrentKubeletConfig(ctx, framework.TestContext.NodeName, "", false, framework.TestContext.StandaloneMode)
172 }
173
174 func cleanupPods(f *framework.Framework) {
175 ginkgo.AfterEach(func(ctx context.Context) {
176 ginkgo.By("Deleting any Pods created by the test in namespace: " + f.Namespace.Name)
177 l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{})
178 framework.ExpectNoError(err)
179 for _, p := range l.Items {
180 if p.Namespace != f.Namespace.Name {
181 continue
182 }
183 framework.Logf("Deleting pod: %s", p.Name)
184 e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute)
185 }
186 })
187 }
188
189
190
191
192 func tempSetCurrentKubeletConfig(f *framework.Framework, updateFunction func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration)) {
193 var oldCfg *kubeletconfig.KubeletConfiguration
194
195 ginkgo.BeforeEach(func(ctx context.Context) {
196 var err error
197 oldCfg, err = getCurrentKubeletConfig(ctx)
198 framework.ExpectNoError(err)
199
200 newCfg := oldCfg.DeepCopy()
201 updateFunction(ctx, newCfg)
202 if apiequality.Semantic.DeepEqual(*newCfg, *oldCfg) {
203 return
204 }
205
206 updateKubeletConfig(ctx, f, newCfg, true)
207 })
208
209 ginkgo.AfterEach(func(ctx context.Context) {
210 if oldCfg != nil {
211
212 updateKubeletConfig(ctx, f, oldCfg, true)
213 }
214 })
215 }
216
217 func updateKubeletConfig(ctx context.Context, f *framework.Framework, kubeletConfig *kubeletconfig.KubeletConfiguration, deleteStateFiles bool) {
218
219 ginkgo.By("Stopping the kubelet")
220 startKubelet := stopKubelet()
221
222
223 gomega.Eventually(ctx, func() bool {
224 return kubeletHealthCheck(kubeletHealthCheckURL)
225 }, time.Minute, time.Second).Should(gomega.BeFalse())
226
227
228 if deleteStateFiles {
229 deleteStateFile(cpuManagerStateFile)
230 deleteStateFile(memoryManagerStateFile)
231 }
232
233 framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(kubeletConfig))
234
235 ginkgo.By("Starting the kubelet")
236 startKubelet()
237 waitForKubeletToStart(ctx, f)
238 }
239
240 func waitForKubeletToStart(ctx context.Context, f *framework.Framework) {
241
242 gomega.Eventually(ctx, func() bool {
243 return kubeletHealthCheck(kubeletHealthCheckURL)
244 }, 2*time.Minute, 5*time.Second).Should(gomega.BeTrue())
245
246
247 gomega.Eventually(ctx, func(ctx context.Context) bool {
248 nodes, err := e2enode.TotalReady(ctx, f.ClientSet)
249 framework.ExpectNoError(err)
250 return nodes == 1
251 }, time.Minute, time.Second).Should(gomega.BeTrue())
252 }
253
254 func deleteStateFile(stateFileName string) {
255 err := exec.Command("/bin/sh", "-c", fmt.Sprintf("rm -f %s", stateFileName)).Run()
256 framework.ExpectNoError(err, "failed to delete the state file")
257 }
258
259
260 func listNamespaceEvents(ctx context.Context, c clientset.Interface, ns string) error {
261 ls, err := c.CoreV1().Events(ns).List(ctx, metav1.ListOptions{})
262 if err != nil {
263 return err
264 }
265 for _, event := range ls.Items {
266 klog.Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message)
267 }
268 return nil
269 }
270
271 func logPodEvents(ctx context.Context, f *framework.Framework) {
272 framework.Logf("Summary of pod events during the test:")
273 err := listNamespaceEvents(ctx, f.ClientSet, f.Namespace.Name)
274 framework.ExpectNoError(err)
275 }
276
277 func logNodeEvents(ctx context.Context, f *framework.Framework) {
278 framework.Logf("Summary of node events during the test:")
279 err := listNamespaceEvents(ctx, f.ClientSet, "")
280 framework.ExpectNoError(err)
281 }
282
283 func getLocalNode(ctx context.Context, f *framework.Framework) *v1.Node {
284 nodeList, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
285 framework.ExpectNoError(err)
286 gomega.Expect(nodeList.Items).Should(gomega.HaveLen(1), "Unexpected number of node objects for node e2e. Expects only one node.")
287 return &nodeList.Items[0]
288 }
289
290
291
292
293
294 func getLocalTestNode(ctx context.Context, f *framework.Framework) (*v1.Node, bool) {
295 node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, framework.TestContext.NodeName, metav1.GetOptions{})
296 framework.ExpectNoError(err)
297 ready := e2enode.IsNodeReady(node)
298 schedulable := e2enode.IsNodeSchedulable(node)
299 framework.Logf("node %q ready=%v schedulable=%v", node.Name, ready, schedulable)
300 return node, ready && schedulable
301 }
302
303
304
305
306 func logKubeletLatencyMetrics(ctx context.Context, metricNames ...string) {
307 metricSet := sets.NewString()
308 for _, key := range metricNames {
309 metricSet.Insert(kubeletmetrics.KubeletSubsystem + "_" + key)
310 }
311 metric, err := e2emetrics.GrabKubeletMetricsWithoutProxy(ctx, fmt.Sprintf("%s:%d", nodeNameOrIP(), ports.KubeletReadOnlyPort), "/metrics")
312 if err != nil {
313 framework.Logf("Error getting kubelet metrics: %v", err)
314 } else {
315 framework.Logf("Kubelet Metrics: %+v", e2emetrics.GetKubeletLatencyMetrics(metric, metricSet))
316 }
317 }
318
319
320 func getCRIClient() (internalapi.RuntimeService, internalapi.ImageManagerService, error) {
321
322 const connectionTimeout = 2 * time.Minute
323 runtimeEndpoint := framework.TestContext.ContainerRuntimeEndpoint
324 r, err := remote.NewRemoteRuntimeService(runtimeEndpoint, connectionTimeout, oteltrace.NewNoopTracerProvider())
325 if err != nil {
326 return nil, nil, err
327 }
328 imageManagerEndpoint := runtimeEndpoint
329 if framework.TestContext.ImageServiceEndpoint != "" {
330
331
332 imageManagerEndpoint = framework.TestContext.ImageServiceEndpoint
333 }
334 i, err := remote.NewRemoteImageService(imageManagerEndpoint, connectionTimeout, oteltrace.NewNoopTracerProvider())
335 if err != nil {
336 return nil, nil, err
337 }
338 return r, i, nil
339 }
340
341
342
343
344
345 func findKubeletServiceName(running bool) string {
346 cmdLine := []string{
347 "systemctl", "list-units", "*kubelet*",
348 }
349 if running {
350 cmdLine = append(cmdLine, "--state=running")
351 }
352 stdout, err := exec.Command("sudo", cmdLine...).CombinedOutput()
353 framework.ExpectNoError(err)
354 regex := regexp.MustCompile("(kubelet-\\w+)")
355 matches := regex.FindStringSubmatch(string(stdout))
356 gomega.Expect(matches).ToNot(gomega.BeEmpty(), "Found more than one kubelet service running: %q", stdout)
357 kubeletServiceName := matches[0]
358 framework.Logf("Get running kubelet with systemctl: %v, %v", string(stdout), kubeletServiceName)
359 return kubeletServiceName
360 }
361
362 func findContainerRuntimeServiceName() (string, error) {
363 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
364 defer cancel()
365
366 conn, err := dbus.NewWithContext(ctx)
367 framework.ExpectNoError(err, "Failed to setup dbus connection")
368 defer conn.Close()
369
370 runtimePids, err := getPidsForProcess(framework.TestContext.ContainerRuntimeProcessName, framework.TestContext.ContainerRuntimePidFile)
371 framework.ExpectNoError(err, "failed to get list of container runtime pids")
372 gomega.Expect(runtimePids).To(gomega.HaveLen(1), "Unexpected number of container runtime pids. Expected 1 but got %v", len(runtimePids))
373
374 containerRuntimePid := runtimePids[0]
375
376 unitName, err := conn.GetUnitNameByPID(ctx, uint32(containerRuntimePid))
377 framework.ExpectNoError(err, "Failed to get container runtime unit name")
378
379 return unitName, nil
380 }
381
382 type containerRuntimeUnitOp int
383
384 const (
385 startContainerRuntimeUnitOp containerRuntimeUnitOp = iota
386 stopContainerRuntimeUnitOp
387 )
388
389 func performContainerRuntimeUnitOp(op containerRuntimeUnitOp) error {
390 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
391 defer cancel()
392
393 conn, err := dbus.NewWithContext(ctx)
394 framework.ExpectNoError(err, "Failed to setup dbus connection")
395 defer conn.Close()
396
397 if containerRuntimeUnitName == "" {
398 containerRuntimeUnitName, err = findContainerRuntimeServiceName()
399 framework.ExpectNoError(err, "Failed to find container runtime name")
400 }
401
402 reschan := make(chan string)
403
404 switch op {
405 case startContainerRuntimeUnitOp:
406 _, err = conn.StartUnitContext(ctx, containerRuntimeUnitName, "replace", reschan)
407 case stopContainerRuntimeUnitOp:
408 _, err = conn.StopUnitContext(ctx, containerRuntimeUnitName, "replace", reschan)
409 default:
410 framework.Failf("Unexpected container runtime op: %v", op)
411 }
412 framework.ExpectNoError(err, "dbus connection error")
413
414 job := <-reschan
415 gomega.Expect(job).To(gomega.Equal("done"), "Expected job to complete with done")
416
417 return nil
418 }
419
420 func stopContainerRuntime() error {
421 return performContainerRuntimeUnitOp(stopContainerRuntimeUnitOp)
422 }
423
424 func startContainerRuntime() error {
425 return performContainerRuntimeUnitOp(startContainerRuntimeUnitOp)
426 }
427
428
429
430
431
432
433
434
435
436 func restartKubelet(running bool) {
437 kubeletServiceName := findKubeletServiceName(running)
438
439 stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput()
440 framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %s", err, string(stdout))
441
442 stdout, err = exec.Command("sudo", "systemctl", "restart", kubeletServiceName).CombinedOutput()
443 framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %s", err, string(stdout))
444 }
445
446
447 func stopKubelet() func() {
448 kubeletServiceName := findKubeletServiceName(true)
449
450
451 stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput()
452 framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %s", err, string(stdout))
453
454 stdout, err = exec.Command("sudo", "systemctl", "kill", kubeletServiceName).CombinedOutput()
455 framework.ExpectNoError(err, "Failed to stop kubelet with systemctl: %v, %s", err, string(stdout))
456
457 return func() {
458
459 stdout, err := exec.Command("sudo", "systemctl", "restart", kubeletServiceName).CombinedOutput()
460 framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %v", err, stdout)
461 }
462 }
463
464
465 func killKubelet(sig string) {
466 kubeletServiceName := findKubeletServiceName(true)
467
468
469 stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput()
470 framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %v", err, stdout)
471
472 stdout, err = exec.Command("sudo", "systemctl", "kill", "-s", sig, kubeletServiceName).CombinedOutput()
473 framework.ExpectNoError(err, "Failed to stop kubelet with systemctl: %v, %v", err, stdout)
474 }
475
476 func kubeletHealthCheck(url string) bool {
477 insecureTransport := http.DefaultTransport.(*http.Transport).Clone()
478 insecureTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
479 insecureHTTPClient := &http.Client{
480 Transport: insecureTransport,
481 }
482
483 req, err := http.NewRequest("HEAD", url, nil)
484 if err != nil {
485 return false
486 }
487 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", framework.TestContext.BearerToken))
488 resp, err := insecureHTTPClient.Do(req)
489 if err != nil {
490 klog.Warningf("Health check on %q failed, error=%v", url, err)
491 } else if resp.StatusCode != http.StatusOK {
492 klog.Warningf("Health check on %q failed, status=%d", url, resp.StatusCode)
493 }
494 return err == nil && resp.StatusCode == http.StatusOK
495 }
496
497 func toCgroupFsName(cgroupName cm.CgroupName) string {
498 if kubeletCfg.CgroupDriver == "systemd" {
499 return cgroupName.ToSystemd()
500 }
501 return cgroupName.ToCgroupfs()
502 }
503
504
505
506
507
508 func reduceAllocatableMemoryUsageIfCgroupv1() {
509 if !IsCgroup2UnifiedMode() {
510 cmd := fmt.Sprintf("echo 0 > /sys/fs/cgroup/memory/%s/memory.force_empty", toCgroupFsName(cm.NewCgroupName(cm.RootCgroupName, defaultNodeAllocatableCgroup)))
511 _, err := exec.Command("sudo", "sh", "-c", cmd).CombinedOutput()
512 framework.ExpectNoError(err)
513 }
514 }
515
516
517
518
519
520 func withFeatureGate(feature featuregate.Feature, desired bool) func() {
521 current := utilfeature.DefaultFeatureGate.Enabled(feature)
522 utilfeature.DefaultMutableFeatureGate.Set(fmt.Sprintf("%s=%v", string(feature), desired))
523 return func() {
524 utilfeature.DefaultMutableFeatureGate.Set(fmt.Sprintf("%s=%v", string(feature), current))
525 }
526 }
527
528
529
530
531
532
533 func waitForAllContainerRemoval(ctx context.Context, podName, podNS string) {
534 rs, _, err := getCRIClient()
535 framework.ExpectNoError(err)
536 gomega.Eventually(ctx, func(ctx context.Context) error {
537 containers, err := rs.ListContainers(ctx, &runtimeapi.ContainerFilter{
538 LabelSelector: map[string]string{
539 types.KubernetesPodNameLabel: podName,
540 types.KubernetesPodNamespaceLabel: podNS,
541 },
542 })
543 if err != nil {
544 return fmt.Errorf("got error waiting for all containers to be removed from CRI: %v", err)
545 }
546
547 if len(containers) > 0 {
548 return fmt.Errorf("expected all containers to be removed from CRI but %v containers still remain. Containers: %+v", len(containers), containers)
549 }
550 return nil
551 }, 2*time.Minute, 1*time.Second).Should(gomega.Succeed())
552 }
553
554 func getPidsForProcess(name, pidFile string) ([]int, error) {
555 if len(pidFile) > 0 {
556 pid, err := getPidFromPidFile(pidFile)
557 if err == nil {
558 return []int{pid}, nil
559 }
560
561 runtime.HandleError(err)
562 }
563 return procfs.PidOf(name)
564 }
565
566 func getPidFromPidFile(pidFile string) (int, error) {
567 file, err := os.Open(pidFile)
568 if err != nil {
569 return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err)
570 }
571 defer file.Close()
572
573 data, err := io.ReadAll(file)
574 if err != nil {
575 return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err)
576 }
577
578 pid, err := strconv.Atoi(string(data))
579 if err != nil {
580 return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err)
581 }
582
583 return pid, nil
584 }
585
586
587
588
589 func WaitForPodInitContainerRestartCount(ctx context.Context, c clientset.Interface, namespace, podName string, initContainerIndex int, desiredRestartCount int32, timeout time.Duration) error {
590 conditionDesc := fmt.Sprintf("init container %d started", initContainerIndex)
591 return e2epod.WaitForPodCondition(ctx, c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
592 if initContainerIndex > len(pod.Status.InitContainerStatuses)-1 {
593 return false, nil
594 }
595 containerStatus := pod.Status.InitContainerStatuses[initContainerIndex]
596 return containerStatus.RestartCount >= desiredRestartCount, nil
597 })
598 }
599
600
601
602 func WaitForPodContainerRestartCount(ctx context.Context, c clientset.Interface, namespace, podName string, containerIndex int, desiredRestartCount int32, timeout time.Duration) error {
603 conditionDesc := fmt.Sprintf("container %d started", containerIndex)
604 return e2epod.WaitForPodCondition(ctx, c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
605 if containerIndex > len(pod.Status.ContainerStatuses)-1 {
606 return false, nil
607 }
608 containerStatus := pod.Status.ContainerStatuses[containerIndex]
609 return containerStatus.RestartCount >= desiredRestartCount, nil
610 })
611 }
612
613
614
615
616
617 func WaitForPodInitContainerToFail(ctx context.Context, c clientset.Interface, namespace, podName string, containerIndex int, reason string, timeout time.Duration) error {
618 conditionDesc := fmt.Sprintf("container %d failed with reason %s", containerIndex, reason)
619 return e2epod.WaitForPodCondition(ctx, c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
620 switch pod.Status.Phase {
621 case v1.PodPending:
622 if len(pod.Status.InitContainerStatuses) == 0 {
623 return false, nil
624 }
625 containerStatus := pod.Status.InitContainerStatuses[containerIndex]
626 if containerStatus.State.Waiting != nil && containerStatus.State.Waiting.Reason == reason {
627 return true, nil
628 }
629 return false, nil
630 case v1.PodFailed, v1.PodRunning, v1.PodSucceeded:
631 return false, fmt.Errorf("pod was expected to be pending, but it is in the state: %s", pod.Status.Phase)
632 }
633 return false, nil
634 })
635 }
636
637 func nodeNameOrIP() string {
638 return "localhost"
639 }
640
View as plain text