1
16
17 package e2enode
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/api/resource"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
28 admissionapi "k8s.io/pod-security-admission/api"
29
30 "k8s.io/kubernetes/test/e2e/framework"
31 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
32 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
33 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
34 e2enodekubelet "k8s.io/kubernetes/test/e2e_node/kubeletconfig"
35 "k8s.io/kubernetes/test/e2e_node/perf/workloads"
36
37 "github.com/onsi/ginkgo/v2"
38 "github.com/onsi/gomega"
39 )
40
41
42 func makeNodePerfPod(w workloads.NodePerfWorkload) *v1.Pod {
43 return &v1.Pod{
44 ObjectMeta: metav1.ObjectMeta{
45 Name: fmt.Sprintf("%s-pod", w.Name()),
46 },
47 Spec: w.PodSpec(),
48 }
49 }
50
51 func setKubeletConfig(ctx context.Context, f *framework.Framework, cfg *kubeletconfig.KubeletConfiguration) {
52 if cfg != nil {
53
54 ginkgo.By("Stopping the kubelet")
55 startKubelet := stopKubelet()
56
57
58 gomega.Eventually(ctx, func() bool {
59 return kubeletHealthCheck(kubeletHealthCheckURL)
60 }, time.Minute, time.Second).Should(gomega.BeFalse())
61
62 framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(cfg))
63
64 ginkgo.By("Starting the kubelet")
65 startKubelet()
66
67
68 gomega.Eventually(ctx, func() bool {
69 return kubeletHealthCheck(kubeletHealthCheckURL)
70 }, 2*time.Minute, 5*time.Second).Should(gomega.BeTrue())
71 }
72
73
74 gomega.Eventually(ctx, func(ctx context.Context) bool {
75 nodes, err := e2enode.TotalReady(ctx, f.ClientSet)
76 framework.ExpectNoError(err)
77 return nodes == 1
78 }, time.Minute, time.Second).Should(gomega.BeTrue())
79 }
80
81
82
83 var _ = SIGDescribe("Node Performance Testing", framework.WithSerial(), framework.WithSlow(), func() {
84 f := framework.NewDefaultFramework("node-performance-testing")
85 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
86 var (
87 wl workloads.NodePerfWorkload
88 oldCfg *kubeletconfig.KubeletConfiguration
89 newCfg *kubeletconfig.KubeletConfiguration
90 pod *v1.Pod
91 )
92 ginkgo.JustBeforeEach(func(ctx context.Context) {
93 err := wl.PreTestExec()
94 framework.ExpectNoError(err)
95 oldCfg, err = getCurrentKubeletConfig(ctx)
96 framework.ExpectNoError(err)
97 newCfg, err = wl.KubeletConfig(oldCfg)
98 framework.ExpectNoError(err)
99 setKubeletConfig(ctx, f, newCfg)
100 })
101
102 cleanup := func(ctx context.Context) {
103 gp := int64(0)
104 delOpts := metav1.DeleteOptions{
105 GracePeriodSeconds: &gp,
106 }
107 e2epod.NewPodClient(f).DeleteSync(ctx, pod.Name, delOpts, e2epod.DefaultPodDeletionTimeout)
108
109
110
111
112
113
114
115
116 time.Sleep(15 * time.Second)
117 ginkgo.By("running the post test exec from the workload")
118 err := wl.PostTestExec()
119 framework.ExpectNoError(err)
120 setKubeletConfig(ctx, f, oldCfg)
121 }
122
123 runWorkload := func(ctx context.Context) {
124 ginkgo.By("running the workload and waiting for success")
125
126 pod = makeNodePerfPod(wl)
127
128 pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
129
130
131 podErr := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, fmt.Sprintf("%s or %s", v1.PodSucceeded, v1.PodFailed), wl.Timeout(),
132 func(pod *v1.Pod) (bool, error) {
133 switch pod.Status.Phase {
134 case v1.PodFailed:
135 return true, fmt.Errorf("pod %q failed with reason: %q, message: %q", pod.Name, pod.Status.Reason, pod.Status.Message)
136 case v1.PodSucceeded:
137 return true, nil
138 default:
139 return false, nil
140 }
141 },
142 )
143 podLogs, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name)
144 framework.ExpectNoError(err)
145 if podErr != nil {
146 framework.Logf("dumping pod logs due to pod error detected: \n%s", podLogs)
147 framework.Failf("pod error: %v", podErr)
148 }
149 perf, err := wl.ExtractPerformanceFromLogs(podLogs)
150 framework.ExpectNoError(err)
151 framework.Logf("Time to complete workload %s: %v", wl.Name(), perf)
152
153 gomega.Expect(podErr).To(gomega.Succeed(), "wait for pod %q to succeed", pod.Name)
154 }
155
156 ginkgo.BeforeEach(func(ctx context.Context) {
157 ginkgo.By("ensure environment has enough CPU + Memory to run")
158 minimumRequiredCPU := resource.MustParse("15")
159 minimumRequiredMemory := resource.MustParse("48Gi")
160 localNodeCap := getLocalNode(ctx, f).Status.Allocatable
161 cpuCap := localNodeCap[v1.ResourceCPU]
162 memCap := localNodeCap[v1.ResourceMemory]
163 if cpuCap.Cmp(minimumRequiredCPU) == -1 {
164 e2eskipper.Skipf("Skipping Node Performance Tests due to lack of CPU. Required %v is less than capacity %v.", minimumRequiredCPU, cpuCap)
165 }
166 if memCap.Cmp(minimumRequiredMemory) == -1 {
167 e2eskipper.Skipf("Skipping Node Performance Tests due to lack of memory. Required %v is less than capacity %v.", minimumRequiredMemory, memCap)
168 }
169 })
170
171 ginkgo.Context("Run node performance testing with pre-defined workloads", func() {
172 ginkgo.BeforeEach(func() {
173 wl = workloads.NodePerfWorkloads[0]
174 })
175 ginkgo.It("NAS parallel benchmark (NPB) suite - Integer Sort (IS) workload", func(ctx context.Context) {
176 ginkgo.DeferCleanup(cleanup)
177 runWorkload(ctx)
178 })
179 })
180 ginkgo.Context("Run node performance testing with pre-defined workloads", func() {
181 ginkgo.BeforeEach(func() {
182 wl = workloads.NodePerfWorkloads[1]
183 })
184 ginkgo.It("NAS parallel benchmark (NPB) suite - Embarrassingly Parallel (EP) workload", func(ctx context.Context) {
185 ginkgo.DeferCleanup(cleanup)
186 runWorkload(ctx)
187 })
188 })
189 ginkgo.Context("Run node performance testing with pre-defined workloads", func() {
190 ginkgo.BeforeEach(func() {
191 wl = workloads.NodePerfWorkloads[2]
192 })
193 ginkgo.It("TensorFlow workload", func(ctx context.Context) {
194 ginkgo.DeferCleanup(cleanup)
195 runWorkload(ctx)
196 })
197 })
198 })
199
View as plain text