1
2
3
4
19
20 package e2enode
21
22 import (
23 "context"
24 "fmt"
25 "strings"
26 "time"
27
28 clientset "k8s.io/client-go/kubernetes"
29 kubeletstatsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
30 "k8s.io/kubernetes/test/e2e/framework"
31 e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
32 e2eperf "k8s.io/kubernetes/test/e2e/framework/perf"
33 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
34 imageutils "k8s.io/kubernetes/test/utils/image"
35 admissionapi "k8s.io/pod-security-admission/api"
36
37 "github.com/onsi/ginkgo/v2"
38 )
39
40 var _ = SIGDescribe("Resource-usage", framework.WithSerial(), framework.WithSlow(), func() {
41 const (
42
43 containerStatsPollingPeriod = 10 * time.Second
44 )
45
46 var (
47 rc *ResourceCollector
48 om *e2ekubelet.RuntimeOperationMonitor
49 )
50
51 f := framework.NewDefaultFramework("resource-usage")
52 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
53
54 ginkgo.BeforeEach(func(ctx context.Context) {
55 om = e2ekubelet.NewRuntimeOperationMonitor(ctx, f.ClientSet)
56
57
58
59
60 e2epod.NewPodClient(f).CreateSync(ctx, getCadvisorPod())
61 rc = NewResourceCollector(containerStatsPollingPeriod)
62 })
63
64 ginkgo.AfterEach(func(ctx context.Context) {
65 result := om.GetLatestRuntimeOperationErrorRate(ctx)
66 framework.Logf("runtime operation error metrics:\n%s", e2ekubelet.FormatRuntimeOperationErrorRate(result))
67 })
68
69
70
71
72 ginkgo.Context("regular resource usage tracking", func() {
73 rTests := []resourceTest{
74 {
75 podsNr: 10,
76 cpuLimits: e2ekubelet.ContainersCPUSummary{
77 kubeletstatsv1alpha1.SystemContainerKubelet: {0.50: 0.30, 0.95: 0.35},
78 kubeletstatsv1alpha1.SystemContainerRuntime: {0.50: 0.30, 0.95: 0.40},
79 },
80 memLimits: e2ekubelet.ResourceUsagePerContainer{
81 kubeletstatsv1alpha1.SystemContainerKubelet: &e2ekubelet.ContainerResourceUsage{MemoryRSSInBytes: 200 * 1024 * 1024},
82 kubeletstatsv1alpha1.SystemContainerRuntime: &e2ekubelet.ContainerResourceUsage{MemoryRSSInBytes: 400 * 1024 * 1024},
83 },
84 },
85 }
86
87 for _, testArg := range rTests {
88 itArg := testArg
89 desc := fmt.Sprintf("resource tracking for %d pods per node", itArg.podsNr)
90 ginkgo.It(desc, func(ctx context.Context) {
91 testInfo := getTestNodeInfo(f, itArg.getTestName(), desc)
92
93 runResourceUsageTest(ctx, f, rc, itArg)
94
95
96 logAndVerifyResource(ctx, f, rc, itArg.cpuLimits, itArg.memLimits, testInfo, true)
97 })
98 }
99 })
100
101 ginkgo.Context("regular resource usage tracking", func() {
102 rTests := []resourceTest{
103 {
104 podsNr: 0,
105 },
106 {
107 podsNr: 10,
108 },
109 {
110 podsNr: 35,
111 },
112 {
113 podsNr: 90,
114 },
115 }
116
117 for _, testArg := range rTests {
118 itArg := testArg
119 desc := fmt.Sprintf("resource tracking for %d pods per node [Benchmark]", itArg.podsNr)
120 ginkgo.It(desc, func(ctx context.Context) {
121 testInfo := getTestNodeInfo(f, itArg.getTestName(), desc)
122
123 runResourceUsageTest(ctx, f, rc, itArg)
124
125
126 logAndVerifyResource(ctx, f, rc, itArg.cpuLimits, itArg.memLimits, testInfo, false)
127 })
128 }
129 })
130 })
131
132 type resourceTest struct {
133 podsNr int
134 cpuLimits e2ekubelet.ContainersCPUSummary
135 memLimits e2ekubelet.ResourceUsagePerContainer
136 }
137
138 func (rt *resourceTest) getTestName() string {
139 return fmt.Sprintf("resource_%d", rt.podsNr)
140 }
141
142
143 func runResourceUsageTest(ctx context.Context, f *framework.Framework, rc *ResourceCollector, testArg resourceTest) {
144 const (
145
146 monitoringTime = 10 * time.Minute
147
148 reportingPeriod = 5 * time.Minute
149
150 sleepAfterCreatePods = 10 * time.Second
151 )
152 pods := newTestPods(testArg.podsNr, true, imageutils.GetPauseImageName(), "test_pod")
153
154 rc.Start()
155
156 ginkgo.DeferCleanup(deletePodsSync, f, append(pods, getCadvisorPod()))
157 ginkgo.DeferCleanup(rc.Stop)
158
159 ginkgo.By("Creating a batch of Pods")
160 e2epod.NewPodClient(f).CreateBatch(ctx, pods)
161
162
163 time.Sleep(sleepAfterCreatePods)
164
165
166 rc.LogLatest()
167 rc.Reset()
168
169 ginkgo.By("Start monitoring resource usage")
170
171
172
173
174
175 deadline := time.Now().Add(monitoringTime)
176 for time.Now().Before(deadline) && ctx.Err() == nil {
177 timeLeft := time.Until(deadline)
178 framework.Logf("Still running...%v left", timeLeft)
179 if timeLeft < reportingPeriod {
180 time.Sleep(timeLeft)
181 } else {
182 time.Sleep(reportingPeriod)
183 }
184 logPods(ctx, f.ClientSet)
185 }
186
187 ginkgo.By("Reporting overall resource usage")
188 logPods(ctx, f.ClientSet)
189 }
190
191
192 func logAndVerifyResource(ctx context.Context, f *framework.Framework, rc *ResourceCollector, cpuLimits e2ekubelet.ContainersCPUSummary,
193 memLimits e2ekubelet.ResourceUsagePerContainer, testInfo map[string]string, isVerify bool) {
194 nodeName := framework.TestContext.NodeName
195
196
197 usagePerContainer, err := rc.GetLatest()
198 framework.ExpectNoError(err)
199 framework.Logf("%s", formatResourceUsageStats(usagePerContainer))
200
201 usagePerNode := make(e2ekubelet.ResourceUsagePerNode)
202 usagePerNode[nodeName] = usagePerContainer
203
204
205 cpuSummary := rc.GetCPUSummary()
206 framework.Logf("%s", formatCPUSummary(cpuSummary))
207
208 cpuSummaryPerNode := make(e2ekubelet.NodesCPUSummary)
209 cpuSummaryPerNode[nodeName] = cpuSummary
210
211
212 logPerfData(e2eperf.ResourceUsageToPerfDataWithLabels(usagePerNode, testInfo), "memory")
213 logPerfData(e2eperf.CPUUsageToPerfDataWithLabels(cpuSummaryPerNode, testInfo), "cpu")
214
215
216 if isVerify {
217 verifyMemoryLimits(ctx, f.ClientSet, memLimits, usagePerNode)
218 verifyCPULimits(cpuLimits, cpuSummaryPerNode)
219 }
220 }
221
222 func verifyMemoryLimits(ctx context.Context, c clientset.Interface, expected e2ekubelet.ResourceUsagePerContainer, actual e2ekubelet.ResourceUsagePerNode) {
223 if expected == nil {
224 return
225 }
226 var errList []string
227 for nodeName, nodeSummary := range actual {
228 var nodeErrs []string
229 for cName, expectedResult := range expected {
230 container, ok := nodeSummary[cName]
231 if !ok {
232 nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing", cName))
233 continue
234 }
235
236 expectedValue := expectedResult.MemoryRSSInBytes
237 actualValue := container.MemoryRSSInBytes
238 if expectedValue != 0 && actualValue > expectedValue {
239 nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: expected RSS memory (MB) < %d; got %d",
240 cName, expectedValue, actualValue))
241 }
242 }
243 if len(nodeErrs) > 0 {
244 errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", ")))
245 heapStats, err := e2ekubelet.GetKubeletHeapStats(ctx, c, nodeName)
246 if err != nil {
247 framework.Logf("Unable to get heap stats from %q", nodeName)
248 } else {
249 framework.Logf("Heap stats on %q\n:%v", nodeName, heapStats)
250 }
251 }
252 }
253 if len(errList) > 0 {
254 framework.Failf("Memory usage exceeding limits:\n %s", strings.Join(errList, "\n"))
255 }
256 }
257
258 func verifyCPULimits(expected e2ekubelet.ContainersCPUSummary, actual e2ekubelet.NodesCPUSummary) {
259 if expected == nil {
260 return
261 }
262 var errList []string
263 for nodeName, perNodeSummary := range actual {
264 var nodeErrs []string
265 for cName, expectedResult := range expected {
266 perContainerSummary, ok := perNodeSummary[cName]
267 if !ok {
268 nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing", cName))
269 continue
270 }
271 for p, expectedValue := range expectedResult {
272 actualValue, ok := perContainerSummary[p]
273 if !ok {
274 nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing percentile %v", cName, p))
275 continue
276 }
277 if actualValue > expectedValue {
278 nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: expected %.0fth%% usage < %.3f; got %.3f",
279 cName, p*100, expectedValue, actualValue))
280 }
281 }
282 }
283 if len(nodeErrs) > 0 {
284 errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", ")))
285 }
286 }
287 if len(errList) > 0 {
288 framework.Failf("CPU usage exceeding limits:\n %s", strings.Join(errList, "\n"))
289 }
290 }
291
292 func logPods(ctx context.Context, c clientset.Interface) {
293 nodeName := framework.TestContext.NodeName
294 podList, err := e2ekubelet.GetKubeletRunningPods(ctx, c, nodeName)
295 if err != nil {
296 framework.Logf("Unable to retrieve kubelet pods for node %v", nodeName)
297 }
298 framework.Logf("%d pods are running on node %v", len(podList.Items), nodeName)
299 }
300
View as plain text