1
16
17 package node
18
19 import (
20 "context"
21 "fmt"
22 "strings"
23 "time"
24
25 "k8s.io/apimachinery/pkg/util/sets"
26 "k8s.io/apimachinery/pkg/util/uuid"
27 clientset "k8s.io/client-go/kubernetes"
28 kubeletstatsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
29 "k8s.io/kubernetes/test/e2e/feature"
30 "k8s.io/kubernetes/test/e2e/framework"
31 e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
32 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
33 e2eperf "k8s.io/kubernetes/test/e2e/framework/perf"
34 e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
35 "k8s.io/kubernetes/test/e2e/perftype"
36 testutils "k8s.io/kubernetes/test/utils"
37 imageutils "k8s.io/kubernetes/test/utils/image"
38 admissionapi "k8s.io/pod-security-admission/api"
39
40 "github.com/onsi/ginkgo/v2"
41 )
42
43 const (
44
45 containerStatsPollingPeriod = 10 * time.Second
46
47 monitoringTime = 20 * time.Minute
48
49 reportingPeriod = 5 * time.Minute
50 )
51
52 type resourceTest struct {
53 podsPerNode int
54 cpuLimits e2ekubelet.ContainersCPUSummary
55 memLimits e2ekubelet.ResourceUsagePerContainer
56 }
57
58 func logPodsOnNodes(ctx context.Context, c clientset.Interface, nodeNames []string) {
59 for _, n := range nodeNames {
60 podList, err := e2ekubelet.GetKubeletRunningPods(ctx, c, n)
61 if err != nil {
62 framework.Logf("Unable to retrieve kubelet pods for node %v", n)
63 continue
64 }
65 framework.Logf("%d pods are running on node %v", len(podList.Items), n)
66 }
67 }
68
69 func runResourceTrackingTest(ctx context.Context, f *framework.Framework, podsPerNode int, nodeNames sets.String, rm *e2ekubelet.ResourceMonitor,
70 expectedCPU map[string]map[float64]float64, expectedMemory e2ekubelet.ResourceUsagePerContainer) {
71 numNodes := nodeNames.Len()
72 totalPods := podsPerNode * numNodes
73 ginkgo.By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods))
74 rcName := fmt.Sprintf("resource%d-%s", totalPods, string(uuid.NewUUID()))
75
76
77 err := e2erc.RunRC(ctx, testutils.RCConfig{
78 Client: f.ClientSet,
79 Name: rcName,
80 Namespace: f.Namespace.Name,
81 Image: imageutils.GetPauseImageName(),
82 Replicas: totalPods,
83 })
84 framework.ExpectNoError(err)
85
86
87 rm.LogLatest()
88 rm.Reset()
89
90 ginkgo.By("Start monitoring resource usage")
91
92
93
94
95
96 deadline := time.Now().Add(monitoringTime)
97 for time.Now().Before(deadline) && ctx.Err() == nil {
98 timeLeft := time.Until(deadline)
99 framework.Logf("Still running...%v left", timeLeft)
100 if timeLeft < reportingPeriod {
101 time.Sleep(timeLeft)
102 } else {
103 time.Sleep(reportingPeriod)
104 }
105 logPodsOnNodes(ctx, f.ClientSet, nodeNames.List())
106 }
107
108 ginkgo.By("Reporting overall resource usage")
109 logPodsOnNodes(ctx, f.ClientSet, nodeNames.List())
110 usageSummary, err := rm.GetLatest()
111 framework.ExpectNoError(err)
112
113 framework.Logf("%s", rm.FormatResourceUsage(usageSummary))
114
115 printPerfData(e2eperf.ResourceUsageToPerfData(rm.GetMasterNodeLatest(usageSummary)))
116 verifyMemoryLimits(ctx, f.ClientSet, expectedMemory, usageSummary)
117
118 cpuSummary := rm.GetCPUSummary()
119 framework.Logf("%s", rm.FormatCPUSummary(cpuSummary))
120
121 printPerfData(e2eperf.CPUUsageToPerfData(rm.GetMasterNodeCPUSummary(cpuSummary)))
122 verifyCPULimits(expectedCPU, cpuSummary)
123
124 ginkgo.By("Deleting the RC")
125 e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, f.Namespace.Name, rcName)
126 }
127
128 func verifyMemoryLimits(ctx context.Context, c clientset.Interface, expected e2ekubelet.ResourceUsagePerContainer, actual e2ekubelet.ResourceUsagePerNode) {
129 if expected == nil {
130 return
131 }
132 var errList []string
133 for nodeName, nodeSummary := range actual {
134 var nodeErrs []string
135 for cName, expectedResult := range expected {
136 container, ok := nodeSummary[cName]
137 if !ok {
138 nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing", cName))
139 continue
140 }
141
142 expectedValue := expectedResult.MemoryRSSInBytes
143 actualValue := container.MemoryRSSInBytes
144 if expectedValue != 0 && actualValue > expectedValue {
145 nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: expected RSS memory (MB) < %d; got %d",
146 cName, expectedValue, actualValue))
147 }
148 }
149 if len(nodeErrs) > 0 {
150 errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", ")))
151 heapStats, err := e2ekubelet.GetKubeletHeapStats(ctx, c, nodeName)
152 if err != nil {
153 framework.Logf("Unable to get heap stats from %q", nodeName)
154 } else {
155 framework.Logf("Heap stats on %q\n:%v", nodeName, heapStats)
156 }
157 }
158 }
159 if len(errList) > 0 {
160 framework.Failf("Memory usage exceeding limits:\n %s", strings.Join(errList, "\n"))
161 }
162 }
163
164 func verifyCPULimits(expected e2ekubelet.ContainersCPUSummary, actual e2ekubelet.NodesCPUSummary) {
165 if expected == nil {
166 return
167 }
168 var errList []string
169 for nodeName, perNodeSummary := range actual {
170 var nodeErrs []string
171 for cName, expectedResult := range expected {
172 perContainerSummary, ok := perNodeSummary[cName]
173 if !ok {
174 nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing", cName))
175 continue
176 }
177 for p, expectedValue := range expectedResult {
178 actualValue, ok := perContainerSummary[p]
179 if !ok {
180 nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing percentile %v", cName, p))
181 continue
182 }
183 if actualValue > expectedValue {
184 nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: expected %.0fth%% usage < %.3f; got %.3f",
185 cName, p*100, expectedValue, actualValue))
186 }
187 }
188 }
189 if len(nodeErrs) > 0 {
190 errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", ")))
191 }
192 }
193 if len(errList) > 0 {
194 framework.Failf("CPU usage exceeding limits:\n %s", strings.Join(errList, "\n"))
195 }
196 }
197
198
199 var _ = SIGDescribe("Kubelet", framework.WithSerial(), framework.WithSlow(), func() {
200 var nodeNames sets.String
201 f := framework.NewDefaultFramework("kubelet-perf")
202 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
203 var om *e2ekubelet.RuntimeOperationMonitor
204 var rm *e2ekubelet.ResourceMonitor
205
206 ginkgo.BeforeEach(func(ctx context.Context) {
207 nodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
208 framework.ExpectNoError(err)
209 nodeNames = sets.NewString()
210 for _, node := range nodes.Items {
211 nodeNames.Insert(node.Name)
212 }
213 om = e2ekubelet.NewRuntimeOperationMonitor(ctx, f.ClientSet)
214 rm = e2ekubelet.NewResourceMonitor(f.ClientSet, e2ekubelet.TargetContainers(), containerStatsPollingPeriod)
215 rm.Start(ctx)
216 })
217
218 ginkgo.AfterEach(func(ctx context.Context) {
219 rm.Stop()
220 result := om.GetLatestRuntimeOperationErrorRate(ctx)
221 framework.Logf("runtime operation error metrics:\n%s", e2ekubelet.FormatRuntimeOperationErrorRate(result))
222 })
223 f.Describe("regular resource usage tracking", feature.RegularResourceUsageTracking, func() {
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240 rTests := []resourceTest{
241 {
242 podsPerNode: 0,
243 cpuLimits: e2ekubelet.ContainersCPUSummary{
244 kubeletstatsv1alpha1.SystemContainerKubelet: {0.50: 0.10, 0.95: 0.20},
245 kubeletstatsv1alpha1.SystemContainerRuntime: {0.50: 0.10, 0.95: 0.20},
246 },
247 memLimits: e2ekubelet.ResourceUsagePerContainer{
248 kubeletstatsv1alpha1.SystemContainerKubelet: &e2ekubelet.ContainerResourceUsage{MemoryRSSInBytes: 200 * 1024 * 1024},
249
250 kubeletstatsv1alpha1.SystemContainerRuntime: &e2ekubelet.ContainerResourceUsage{MemoryRSSInBytes: 125 * 1024 * 1024},
251 },
252 },
253 {
254 cpuLimits: e2ekubelet.ContainersCPUSummary{
255 kubeletstatsv1alpha1.SystemContainerKubelet: {0.50: 0.35, 0.95: 0.50},
256 kubeletstatsv1alpha1.SystemContainerRuntime: {0.50: 0.10, 0.95: 0.50},
257 },
258 podsPerNode: 100,
259 memLimits: e2ekubelet.ResourceUsagePerContainer{
260 kubeletstatsv1alpha1.SystemContainerKubelet: &e2ekubelet.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024},
261 kubeletstatsv1alpha1.SystemContainerRuntime: &e2ekubelet.ContainerResourceUsage{MemoryRSSInBytes: 350 * 1024 * 1024},
262 },
263 },
264 }
265 for _, testArg := range rTests {
266 itArg := testArg
267 podsPerNode := itArg.podsPerNode
268 name := fmt.Sprintf(
269 "resource tracking for %d pods per node", podsPerNode)
270 ginkgo.It(name, func(ctx context.Context) {
271 runResourceTrackingTest(ctx, f, podsPerNode, nodeNames, rm, itArg.cpuLimits, itArg.memLimits)
272 })
273 }
274 })
275 f.Describe("experimental resource usage tracking", feature.ExperimentalResourceUsageTracking, func() {
276 density := []int{100}
277 for i := range density {
278 podsPerNode := density[i]
279 name := fmt.Sprintf(
280 "resource tracking for %d pods per node", podsPerNode)
281 ginkgo.It(name, func(ctx context.Context) {
282 runResourceTrackingTest(ctx, f, podsPerNode, nodeNames, rm, nil, nil)
283 })
284 }
285 })
286 })
287
288
289
290 func printPerfData(p *perftype.PerfData) {
291
292 if str := framework.PrettyPrintJSON(p); str != "" {
293 framework.Logf("%s %s\n%s", perftype.PerfResultTag, str, perftype.PerfResultEnd)
294 }
295 }
296
View as plain text