1
16
17 package node
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "sort"
24 "strconv"
25 "strings"
26 "time"
27
28 v1 "k8s.io/api/core/v1"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/fields"
31 "k8s.io/kubernetes/test/e2e/framework"
32 e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
33 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
34 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
35 e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
36 "k8s.io/kubernetes/test/e2e/nodefeature"
37 testutils "k8s.io/kubernetes/test/utils"
38 admissionapi "k8s.io/pod-security-admission/api"
39
40 "github.com/onsi/ginkgo/v2"
41 "github.com/onsi/gomega"
42 )
43
44
45
46 var _ = SIGDescribe("NodeProblemDetector", nodefeature.NodeProblemDetector, func() {
47 const (
48 pollInterval = 1 * time.Second
49 pollTimeout = 1 * time.Minute
50 maxNodesToProcess = 10
51 )
52 f := framework.NewDefaultFramework("node-problem-detector")
53 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
54
55 ginkgo.BeforeEach(func(ctx context.Context) {
56 e2eskipper.SkipUnlessSSHKeyPresent()
57 e2eskipper.SkipUnlessProviderIs(framework.ProvidersWithSSH...)
58 e2eskipper.SkipUnlessProviderIs("gce", "gke")
59 e2eskipper.SkipUnlessNodeOSDistroIs("gci", "ubuntu")
60 e2enode.WaitForTotalHealthy(ctx, f.ClientSet, time.Minute)
61 })
62
63 ginkgo.It("should run without error", func(ctx context.Context) {
64 ginkgo.By("Getting all nodes and their SSH-able IP addresses")
65 readyNodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
66 framework.ExpectNoError(err)
67
68 nodes := []v1.Node{}
69 hosts := []string{}
70 for _, node := range readyNodes.Items {
71 host := ""
72 for _, addr := range node.Status.Addresses {
73 if addr.Type == v1.NodeExternalIP {
74 host = net.JoinHostPort(addr.Address, "22")
75 break
76 }
77 }
78
79 if len(host) > 0 {
80 nodes = append(nodes, node)
81 hosts = append(hosts, host)
82 }
83 }
84
85 if len(nodes) == 0 {
86 ginkgo.Skip("Skipping test due to lack of ready nodes with public IP")
87 }
88
89 if len(nodes) > maxNodesToProcess {
90 nodes = nodes[:maxNodesToProcess]
91 hosts = hosts[:maxNodesToProcess]
92 }
93
94 isStandaloneMode := make(map[string]bool)
95 cpuUsageStats := make(map[string][]float64)
96 uptimeStats := make(map[string][]float64)
97 rssStats := make(map[string][]float64)
98 workingSetStats := make(map[string][]float64)
99
100
101
102
103
104
105 checkForKubeletStart := false
106
107 for _, host := range hosts {
108 cpuUsageStats[host] = []float64{}
109 uptimeStats[host] = []float64{}
110 rssStats[host] = []float64{}
111 workingSetStats[host] = []float64{}
112
113 cmd := "systemctl status node-problem-detector.service"
114 result, err := e2essh.SSH(ctx, cmd, host, framework.TestContext.Provider)
115 isStandaloneMode[host] = (err == nil && result.Code == 0)
116
117 if isStandaloneMode[host] {
118 ginkgo.By(fmt.Sprintf("Check node %q has node-problem-detector process", host))
119
120
121
122 psCmd := "ps aux | grep [n]ode-problem-detector"
123 result, err = e2essh.SSH(ctx, psCmd, host, framework.TestContext.Provider)
124 framework.ExpectNoError(err)
125 gomega.Expect(result.Code).To(gomega.Equal(0))
126 gomega.Expect(result.Stdout).To(gomega.ContainSubstring("node-problem-detector"))
127
128 ginkgo.By(fmt.Sprintf("Check node-problem-detector is running fine on node %q", host))
129 journalctlCmd := "sudo journalctl -r -u node-problem-detector"
130 result, err = e2essh.SSH(ctx, journalctlCmd, host, framework.TestContext.Provider)
131 framework.ExpectNoError(err)
132 gomega.Expect(result.Code).To(gomega.Equal(0))
133 gomega.Expect(result.Stdout).NotTo(gomega.ContainSubstring("node-problem-detector.service: Failed"))
134
135
136 ginkgo.By(fmt.Sprintf("Check when node-problem-detector started on node %q", host))
137 npdStartTimeCommand := "sudo systemctl show --timestamp=utc node-problem-detector -P ActiveEnterTimestamp"
138 result, err = e2essh.SSH(ctx, npdStartTimeCommand, host, framework.TestContext.Provider)
139 framework.ExpectNoError(err)
140 gomega.Expect(result.Code).To(gomega.Equal(0))
141
142
143
144 st, err := time.Parse("Mon 2006-01-02 15:04:05 MST", result.Stdout)
145 if err != nil {
146 framework.Logf("Failed to parse when NPD started. Got exit code: %v and stdout: %v, error: %v. Will skip check for kubelet start event.", result.Code, result.Stdout, err)
147 } else {
148 checkForKubeletStart = time.Since(st) < time.Hour
149 }
150
151 cpuUsage, uptime := getCPUStat(ctx, f, host)
152 cpuUsageStats[host] = append(cpuUsageStats[host], cpuUsage)
153 uptimeStats[host] = append(uptimeStats[host], uptime)
154
155 }
156 ginkgo.By(fmt.Sprintf("Inject log to trigger DockerHung on node %q", host))
157 log := "INFO: task docker:12345 blocked for more than 120 seconds."
158 injectLogCmd := "sudo sh -c \"echo 'kernel: " + log + "' >> /dev/kmsg\""
159 result, err = e2essh.SSH(ctx, injectLogCmd, host, framework.TestContext.Provider)
160 framework.ExpectNoError(err)
161 gomega.Expect(result.Code).To(gomega.Equal(0))
162 }
163
164 ginkgo.By("Check node-problem-detector can post conditions and events to API server")
165 for _, node := range nodes {
166 ginkgo.By(fmt.Sprintf("Check node-problem-detector posted KernelDeadlock condition on node %q", node.Name))
167 gomega.Eventually(ctx, func() error {
168 return verifyNodeCondition(ctx, f, "KernelDeadlock", v1.ConditionTrue, "DockerHung", node.Name)
169 }, pollTimeout, pollInterval).Should(gomega.Succeed())
170
171 ginkgo.By(fmt.Sprintf("Check node-problem-detector posted DockerHung event on node %q", node.Name))
172 eventListOptions := metav1.ListOptions{FieldSelector: fields.Set{"involvedObject.kind": "Node"}.AsSelector().String()}
173 gomega.Eventually(ctx, func(ctx context.Context) error {
174 return verifyEvents(ctx, f, eventListOptions, 1, "DockerHung", node.Name)
175 }, pollTimeout, pollInterval).Should(gomega.Succeed())
176
177 if checkForKubeletStart {
178
179
180
181
182
183 ginkgo.By(fmt.Sprintf("Check node-problem-detector posted KubeletStart event on node %q", node.Name))
184 gomega.Eventually(ctx, func(ctx context.Context) error {
185 return verifyEventExists(ctx, f, eventListOptions, "KubeletStart", node.Name)
186 }, pollTimeout, pollInterval).Should(gomega.Succeed())
187 } else {
188 ginkgo.By("KubeletStart event will NOT be checked")
189 }
190 }
191
192 ginkgo.By("Gather node-problem-detector cpu and memory stats")
193 numIterations := 60
194 for i := 1; i <= numIterations; i++ {
195 for j, host := range hosts {
196 if isStandaloneMode[host] {
197 rss, workingSet := getMemoryStat(ctx, f, host)
198 rssStats[host] = append(rssStats[host], rss)
199 workingSetStats[host] = append(workingSetStats[host], workingSet)
200 if i == numIterations {
201 cpuUsage, uptime := getCPUStat(ctx, f, host)
202 cpuUsageStats[host] = append(cpuUsageStats[host], cpuUsage)
203 uptimeStats[host] = append(uptimeStats[host], uptime)
204 }
205 } else {
206 cpuUsage, rss, workingSet := getNpdPodStat(ctx, f, nodes[j].Name)
207 cpuUsageStats[host] = append(cpuUsageStats[host], cpuUsage)
208 rssStats[host] = append(rssStats[host], rss)
209 workingSetStats[host] = append(workingSetStats[host], workingSet)
210 }
211 }
212 time.Sleep(time.Second)
213 }
214
215 cpuStatsMsg := "CPU (core):"
216 rssStatsMsg := "RSS (MB):"
217 workingSetStatsMsg := "WorkingSet (MB):"
218 for i, host := range hosts {
219 if isStandaloneMode[host] {
220
221
222 cpuUsage := cpuUsageStats[host][1] - cpuUsageStats[host][0]
223 totaltime := uptimeStats[host][1] - uptimeStats[host][0]
224 cpuStatsMsg += fmt.Sprintf(" %s[%.3f];", nodes[i].Name, cpuUsage/totaltime)
225 } else {
226 sort.Float64s(cpuUsageStats[host])
227 cpuStatsMsg += fmt.Sprintf(" %s[%.3f|%.3f|%.3f];", nodes[i].Name,
228 cpuUsageStats[host][0], cpuUsageStats[host][len(cpuUsageStats[host])/2], cpuUsageStats[host][len(cpuUsageStats[host])-1])
229 }
230
231 sort.Float64s(rssStats[host])
232 rssStatsMsg += fmt.Sprintf(" %s[%.1f|%.1f|%.1f];", nodes[i].Name,
233 rssStats[host][0], rssStats[host][len(rssStats[host])/2], rssStats[host][len(rssStats[host])-1])
234
235 sort.Float64s(workingSetStats[host])
236 workingSetStatsMsg += fmt.Sprintf(" %s[%.1f|%.1f|%.1f];", nodes[i].Name,
237 workingSetStats[host][0], workingSetStats[host][len(workingSetStats[host])/2], workingSetStats[host][len(workingSetStats[host])-1])
238 }
239 framework.Logf("Node-Problem-Detector CPU and Memory Stats:\n\t%s\n\t%s\n\t%s", cpuStatsMsg, rssStatsMsg, workingSetStatsMsg)
240 })
241 })
242
243 func verifyEvents(ctx context.Context, f *framework.Framework, options metav1.ListOptions, num int, reason, nodeName string) error {
244 events, err := f.ClientSet.CoreV1().Events(metav1.NamespaceDefault).List(ctx, options)
245 if err != nil {
246 return err
247 }
248 count := 0
249 for _, event := range events.Items {
250 if event.Reason != reason || event.Source.Host != nodeName {
251 continue
252 }
253 count += int(event.Count)
254 }
255 if count != num {
256 return fmt.Errorf("expect event number %d, got %d: %v", num, count, events.Items)
257 }
258 return nil
259 }
260
261 func verifyEventExists(ctx context.Context, f *framework.Framework, options metav1.ListOptions, reason, nodeName string) error {
262 events, err := f.ClientSet.CoreV1().Events(metav1.NamespaceDefault).List(ctx, options)
263 if err != nil {
264 return err
265 }
266 for _, event := range events.Items {
267 if event.Reason == reason && event.Source.Host == nodeName && event.Count > 0 {
268 return nil
269 }
270 }
271 return fmt.Errorf("Event %s does not exist: %v", reason, events.Items)
272 }
273
274 func verifyNodeCondition(ctx context.Context, f *framework.Framework, condition v1.NodeConditionType, status v1.ConditionStatus, reason, nodeName string) error {
275 node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
276 if err != nil {
277 return err
278 }
279 _, c := testutils.GetNodeCondition(&node.Status, condition)
280 if c == nil {
281 return fmt.Errorf("node condition %q not found", condition)
282 }
283 if c.Status != status || c.Reason != reason {
284 return fmt.Errorf("unexpected node condition %q: %+v", condition, c)
285 }
286 return nil
287 }
288
289 func getMemoryStat(ctx context.Context, f *framework.Framework, host string) (rss, workingSet float64) {
290 var memCmd string
291
292 isCgroupV2 := isHostRunningCgroupV2(ctx, f, host)
293 if isCgroupV2 {
294 memCmd = "cat /sys/fs/cgroup/system.slice/node-problem-detector.service/memory.current && cat /sys/fs/cgroup/system.slice/node-problem-detector.service/memory.stat"
295 } else {
296 memCmd = "cat /sys/fs/cgroup/memory/system.slice/node-problem-detector.service/memory.usage_in_bytes && cat /sys/fs/cgroup/memory/system.slice/node-problem-detector.service/memory.stat"
297 }
298
299 result, err := e2essh.SSH(ctx, memCmd, host, framework.TestContext.Provider)
300 framework.ExpectNoError(err)
301 gomega.Expect(result.Code).To(gomega.Equal(0))
302 lines := strings.Split(result.Stdout, "\n")
303
304 memoryUsage, err := strconv.ParseFloat(lines[0], 64)
305 framework.ExpectNoError(err)
306
307 var rssToken, inactiveFileToken string
308 if isCgroupV2 {
309
310
311 rssToken = "anon"
312 inactiveFileToken = "inactive_file"
313 } else {
314 rssToken = "total_rss"
315 inactiveFileToken = "total_inactive_file"
316 }
317
318 var totalInactiveFile float64
319 for _, line := range lines[1:] {
320 tokens := strings.Split(line, " ")
321
322 if tokens[0] == rssToken {
323 rss, err = strconv.ParseFloat(tokens[1], 64)
324 framework.ExpectNoError(err)
325 }
326 if tokens[0] == inactiveFileToken {
327 totalInactiveFile, err = strconv.ParseFloat(tokens[1], 64)
328 framework.ExpectNoError(err)
329 }
330 }
331
332 workingSet = memoryUsage
333 if workingSet < totalInactiveFile {
334 workingSet = 0
335 } else {
336 workingSet -= totalInactiveFile
337 }
338
339
340 rss = rss / 1024 / 1024
341 workingSet = workingSet / 1024 / 1024
342 return
343 }
344
345 func getCPUStat(ctx context.Context, f *framework.Framework, host string) (usage, uptime float64) {
346 var cpuCmd string
347 if isHostRunningCgroupV2(ctx, f, host) {
348 cpuCmd = " cat /sys/fs/cgroup/cpu.stat | grep 'usage_usec' | sed 's/[^0-9]*//g' && cat /proc/uptime | awk '{print $1}'"
349 } else {
350 cpuCmd = "cat /sys/fs/cgroup/cpu/system.slice/node-problem-detector.service/cpuacct.usage && cat /proc/uptime | awk '{print $1}'"
351 }
352
353 result, err := e2essh.SSH(ctx, cpuCmd, host, framework.TestContext.Provider)
354 framework.ExpectNoError(err)
355 gomega.Expect(result.Code).To(gomega.Equal(0))
356 lines := strings.Split(result.Stdout, "\n")
357
358 usage, err = strconv.ParseFloat(lines[0], 64)
359 framework.ExpectNoError(err, "Cannot parse float for usage")
360 uptime, err = strconv.ParseFloat(lines[1], 64)
361 framework.ExpectNoError(err, "Cannot parse float for uptime")
362
363
364 usage *= 1e-9
365 return
366 }
367
368 func isHostRunningCgroupV2(ctx context.Context, f *framework.Framework, host string) bool {
369 result, err := e2essh.SSH(ctx, "stat -fc %T /sys/fs/cgroup/", host, framework.TestContext.Provider)
370 framework.ExpectNoError(err)
371 gomega.Expect(result.Code).To(gomega.Equal(0))
372
373
374
375 return strings.Contains(result.Stdout, "cgroup2") || strings.Contains(result.Stdout, "0x63677270")
376 }
377
378 func getNpdPodStat(ctx context.Context, f *framework.Framework, nodeName string) (cpuUsage, rss, workingSet float64) {
379 summary, err := e2ekubelet.GetStatsSummary(ctx, f.ClientSet, nodeName)
380 framework.ExpectNoError(err)
381
382 hasNpdPod := false
383 for _, pod := range summary.Pods {
384 if !strings.HasPrefix(pod.PodRef.Name, "node-problem-detector") {
385 continue
386 }
387 if pod.CPU != nil && pod.CPU.UsageNanoCores != nil {
388 cpuUsage = float64(*pod.CPU.UsageNanoCores) * 1e-9
389 }
390 if pod.Memory != nil {
391 if pod.Memory.RSSBytes != nil {
392 rss = float64(*pod.Memory.RSSBytes) / 1024 / 1024
393 }
394 if pod.Memory.WorkingSetBytes != nil {
395 workingSet = float64(*pod.Memory.WorkingSetBytes) / 1024 / 1024
396 }
397 }
398 hasNpdPod = true
399 break
400 }
401 if !hasNpdPod {
402 framework.Failf("No node-problem-detector pod is present in %+v", summary.Pods)
403 }
404 return
405 }
406
View as plain text