    17  package node
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"sort"
    24  	"strconv"
    25  	"strings"
    26  	"time"
    28  	v1 "k8s.io/api/core/v1"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/fields"
    31  	"k8s.io/kubernetes/test/e2e/framework"
    32  	e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
    33  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    34  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    35  	e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
    36  	"k8s.io/kubernetes/test/e2e/nodefeature"
    37  	testutils "k8s.io/kubernetes/test/utils"
    38  	admissionapi "k8s.io/pod-security-admission/api"
    40  	"github.com/onsi/ginkgo/v2"
    41  	"github.com/onsi/gomega"
    42  )
    44  // This test checks if node-problem-detector (NPD) runs fine without error on
    45  // the up to 10 nodes in the cluster. NPD's functionality is tested in e2e_node tests.
    46  var _ = SIGDescribe("NodeProblemDetector", nodefeature.NodeProblemDetector, func() {
    47  	const (
    48  		pollInterval      = 1 * time.Second
    49  		pollTimeout       = 1 * time.Minute
    50  		maxNodesToProcess = 10
    51  	)
    52  	f := framework.NewDefaultFramework("node-problem-detector")
    53  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    55  	ginkgo.BeforeEach(func(ctx context.Context) {
    56  		e2eskipper.SkipUnlessSSHKeyPresent()
    57  		e2eskipper.SkipUnlessProviderIs(framework.ProvidersWithSSH...)
    58  		e2eskipper.SkipUnlessProviderIs("gce", "gke")
    59  		e2eskipper.SkipUnlessNodeOSDistroIs("gci", "ubuntu")
    60  		e2enode.WaitForTotalHealthy(ctx, f.ClientSet, time.Minute)
    61  	})
    63  	ginkgo.It("should run without error", func(ctx context.Context) {
    64  		ginkgo.By("Getting all nodes and their SSH-able IP addresses")
    65  		readyNodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
    66  		framework.ExpectNoError(err)
    68  		nodes := []v1.Node{}
    69  		hosts := []string{}
    70  		for _, node := range readyNodes.Items {
    71  			host := ""
    72  			for _, addr := range node.Status.Addresses {
    73  				if addr.Type == v1.NodeExternalIP {
    74  					host = net.JoinHostPort(addr.Address, "22")
    75  					break
    76  				}
    77  			}
    78  			// Not every node has to have an external IP address.
    79  			if len(host) > 0 {
    80  				nodes = append(nodes, node)
    81  				hosts = append(hosts, host)
    82  			}
    83  		}
    85  		if len(nodes) == 0 {
    86  			ginkgo.Skip("Skipping test due to lack of ready nodes with public IP")
    87  		}
    89  		if len(nodes) > maxNodesToProcess {
    90  			nodes = nodes[:maxNodesToProcess]
    91  			hosts = hosts[:maxNodesToProcess]
    92  		}
    94  		isStandaloneMode := make(map[string]bool)
    95  		cpuUsageStats := make(map[string][]float64)
    96  		uptimeStats := make(map[string][]float64)
    97  		rssStats := make(map[string][]float64)
    98  		workingSetStats := make(map[string][]float64)
   100  		// Some tests suites running for days.
   101  		// This test is not marked as Disruptive or Serial so we do not want to
   102  		// restart the kubelet during the test to check for KubeletStart event
   103  		// detection. We use heuristic here to check if we need to validate for the
   104  		// KubeletStart event since there is no easy way to check when test has actually started.
   105  		checkForKubeletStart := false
   107  		for _, host := range hosts {
   108  			cpuUsageStats[host] = []float64{}
   109  			uptimeStats[host] = []float64{}
   110  			rssStats[host] = []float64{}
   111  			workingSetStats[host] = []float64{}
   113  			cmd := "systemctl status node-problem-detector.service"
   114  			result, err := e2essh.SSH(ctx, cmd, host, framework.TestContext.Provider)
   115  			isStandaloneMode[host] = (err == nil && result.Code == 0)
   117  			if isStandaloneMode[host] {
   118  				ginkgo.By(fmt.Sprintf("Check node %q has node-problem-detector process", host))
   119  				// Using brackets "[n]" is a trick to prevent grep command itself from
   120  				// showing up, because string text "[n]ode-problem-detector" does not
   121  				// match regular expression "[n]ode-problem-detector".
   122  				psCmd := "ps aux | grep [n]ode-problem-detector"
   123  				result, err = e2essh.SSH(ctx, psCmd, host, framework.TestContext.Provider)
   124  				framework.ExpectNoError(err)
   125  				gomega.Expect(result.Code).To(gomega.Equal(0))
   126  				gomega.Expect(result.Stdout).To(gomega.ContainSubstring("node-problem-detector"))
   128  				ginkgo.By(fmt.Sprintf("Check node-problem-detector is running fine on node %q", host))
   129  				journalctlCmd := "sudo journalctl -r -u node-problem-detector"
   130  				result, err = e2essh.SSH(ctx, journalctlCmd, host, framework.TestContext.Provider)
   131  				framework.ExpectNoError(err)
   132  				gomega.Expect(result.Code).To(gomega.Equal(0))
   133  				gomega.Expect(result.Stdout).NotTo(gomega.ContainSubstring("node-problem-detector.service: Failed"))
   135  				// We only will check for the KubeletStart even if parsing of date here succeeded.
   136  				ginkgo.By(fmt.Sprintf("Check when node-problem-detector started on node %q", host))
   137  				npdStartTimeCommand := "sudo systemctl show --timestamp=utc node-problem-detector -P ActiveEnterTimestamp"
   138  				result, err = e2essh.SSH(ctx, npdStartTimeCommand, host, framework.TestContext.Provider)
   139  				framework.ExpectNoError(err)
   140  				gomega.Expect(result.Code).To(gomega.Equal(0))
   142  				// The time format matches the systemd format.
   143  				// 'utc': 'Day YYYY-MM-DD HH:MM:SS UTC (see https://www.freedesktop.org/software/systemd/man/systemd.time.html)
   144  				st, err := time.Parse("Mon 2006-01-02 15:04:05 MST", result.Stdout)
   145  				if err != nil {
   146  					framework.Logf("Failed to parse when NPD started. Got exit code: %v and stdout: %v, error: %v. Will skip check for kubelet start event.", result.Code, result.Stdout, err)
   147  				} else {
   148  					checkForKubeletStart = time.Since(st) < time.Hour
   149  				}
   151  				cpuUsage, uptime := getCPUStat(ctx, f, host)
   152  				cpuUsageStats[host] = append(cpuUsageStats[host], cpuUsage)
   153  				uptimeStats[host] = append(uptimeStats[host], uptime)
   155  			}
   156  			ginkgo.By(fmt.Sprintf("Inject log to trigger DockerHung on node %q", host))
   157  			log := "INFO: task docker:12345 blocked for more than 120 seconds."
   158  			injectLogCmd := "sudo sh -c \"echo 'kernel: " + log + "' >> /dev/kmsg\""
   159  			result, err = e2essh.SSH(ctx, injectLogCmd, host, framework.TestContext.Provider)
   160  			framework.ExpectNoError(err)
   161  			gomega.Expect(result.Code).To(gomega.Equal(0))
   162  		}
   164  		ginkgo.By("Check node-problem-detector can post conditions and events to API server")
   165  		for _, node := range nodes {
   166  			ginkgo.By(fmt.Sprintf("Check node-problem-detector posted KernelDeadlock condition on node %q", node.Name))
   167  			gomega.Eventually(ctx, func() error {
   168  				return verifyNodeCondition(ctx, f, "KernelDeadlock", v1.ConditionTrue, "DockerHung", node.Name)
   169  			}, pollTimeout, pollInterval).Should(gomega.Succeed())
   171  			ginkgo.By(fmt.Sprintf("Check node-problem-detector posted DockerHung event on node %q", node.Name))
   172  			eventListOptions := metav1.ListOptions{FieldSelector: fields.Set{"involvedObject.kind": "Node"}.AsSelector().String()}
   173  			gomega.Eventually(ctx, func(ctx context.Context) error {
   174  				return verifyEvents(ctx, f, eventListOptions, 1, "DockerHung", node.Name)
   175  			}, pollTimeout, pollInterval).Should(gomega.Succeed())
   177  			if checkForKubeletStart {
   178  				// Node problem detector reports kubelet start events automatically starting from NPD v0.7.0+.
   179  				// Since Kubelet may be restarted for a few times after node is booted. We just check the event
   180  				// is detected, but do not check how many times Kubelet is started.
   181  				//
   182  				// Some test suites run for hours and KubeletStart event will already be cleaned up
   183  				ginkgo.By(fmt.Sprintf("Check node-problem-detector posted KubeletStart event on node %q", node.Name))
   184  				gomega.Eventually(ctx, func(ctx context.Context) error {
   185  					return verifyEventExists(ctx, f, eventListOptions, "KubeletStart", node.Name)
   186  				}, pollTimeout, pollInterval).Should(gomega.Succeed())
   187  			} else {
   188  				ginkgo.By("KubeletStart event will NOT be checked")
   189  			}
   190  		}
   192  		ginkgo.By("Gather node-problem-detector cpu and memory stats")
   193  		numIterations := 60
   194  		for i := 1; i <= numIterations; i++ {
   195  			for j, host := range hosts {
   196  				if isStandaloneMode[host] {
   197  					rss, workingSet := getMemoryStat(ctx, f, host)
   198  					rssStats[host] = append(rssStats[host], rss)
   199  					workingSetStats[host] = append(workingSetStats[host], workingSet)
   200  					if i == numIterations {
   201  						cpuUsage, uptime := getCPUStat(ctx, f, host)
   202  						cpuUsageStats[host] = append(cpuUsageStats[host], cpuUsage)
   203  						uptimeStats[host] = append(uptimeStats[host], uptime)
   204  					}
   205  				} else {
   206  					cpuUsage, rss, workingSet := getNpdPodStat(ctx, f, nodes[j].Name)
   207  					cpuUsageStats[host] = append(cpuUsageStats[host], cpuUsage)
   208  					rssStats[host] = append(rssStats[host], rss)
   209  					workingSetStats[host] = append(workingSetStats[host], workingSet)
   210  				}
   211  			}
   212  			time.Sleep(time.Second)
   213  		}
   215  		cpuStatsMsg := "CPU (core):"
   216  		rssStatsMsg := "RSS (MB):"
   217  		workingSetStatsMsg := "WorkingSet (MB):"
   218  		for i, host := range hosts {
   219  			if isStandaloneMode[host] {
   220  				// When in standalone mode, NPD is running as systemd service. We
   221  				// calculate its cpu usage from cgroup cpuacct value differences.
   222  				cpuUsage := cpuUsageStats[host][1] - cpuUsageStats[host][0]
   223  				totaltime := uptimeStats[host][1] - uptimeStats[host][0]
   224  				cpuStatsMsg += fmt.Sprintf(" %s[%.3f];", nodes[i].Name, cpuUsage/totaltime)
   225  			} else {
   226  				sort.Float64s(cpuUsageStats[host])
   227  				cpuStatsMsg += fmt.Sprintf(" %s[%.3f|%.3f|%.3f];", nodes[i].Name,
   228  					cpuUsageStats[host][0], cpuUsageStats[host][len(cpuUsageStats[host])/2], cpuUsageStats[host][len(cpuUsageStats[host])-1])
   229  			}
   231  			sort.Float64s(rssStats[host])
   232  			rssStatsMsg += fmt.Sprintf(" %s[%.1f|%.1f|%.1f];", nodes[i].Name,
   233  				rssStats[host][0], rssStats[host][len(rssStats[host])/2], rssStats[host][len(rssStats[host])-1])
   235  			sort.Float64s(workingSetStats[host])
   236  			workingSetStatsMsg += fmt.Sprintf(" %s[%.1f|%.1f|%.1f];", nodes[i].Name,
   237  				workingSetStats[host][0], workingSetStats[host][len(workingSetStats[host])/2], workingSetStats[host][len(workingSetStats[host])-1])
   238  		}
   239  		framework.Logf("Node-Problem-Detector CPU and Memory Stats:\n\t%s\n\t%s\n\t%s", cpuStatsMsg, rssStatsMsg, workingSetStatsMsg)
   240  	})
   241  })
   243  func verifyEvents(ctx context.Context, f *framework.Framework, options metav1.ListOptions, num int, reason, nodeName string) error {
   244  	events, err := f.ClientSet.CoreV1().Events(metav1.NamespaceDefault).List(ctx, options)
   245  	if err != nil {
   246  		return err
   247  	}
   248  	count := 0
   249  	for _, event := range events.Items {
   250  		if event.Reason != reason || event.Source.Host != nodeName {
   251  			continue
   252  		}
   253  		count += int(event.Count)
   254  	}
   255  	if count != num {
   256  		return fmt.Errorf("expect event number %d, got %d: %v", num, count, events.Items)
   257  	}
   258  	return nil
   259  }
   261  func verifyEventExists(ctx context.Context, f *framework.Framework, options metav1.ListOptions, reason, nodeName string) error {
   262  	events, err := f.ClientSet.CoreV1().Events(metav1.NamespaceDefault).List(ctx, options)
   263  	if err != nil {
   264  		return err
   265  	}
   266  	for _, event := range events.Items {
   267  		if event.Reason == reason && event.Source.Host == nodeName && event.Count > 0 {
   268  			return nil
   269  		}
   270  	}
   271  	return fmt.Errorf("Event %s does not exist: %v", reason, events.Items)
   272  }
   274  func verifyNodeCondition(ctx context.Context, f *framework.Framework, condition v1.NodeConditionType, status v1.ConditionStatus, reason, nodeName string) error {
   275  	node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
   276  	if err != nil {
   277  		return err
   278  	}
   279  	_, c := testutils.GetNodeCondition(&node.Status, condition)
   280  	if c == nil {
   281  		return fmt.Errorf("node condition %q not found", condition)
   282  	}
   283  	if c.Status != status || c.Reason != reason {
   284  		return fmt.Errorf("unexpected node condition %q: %+v", condition, c)
   285  	}
   286  	return nil
   287  }
   289  func getMemoryStat(ctx context.Context, f *framework.Framework, host string) (rss, workingSet float64) {
   290  	var memCmd string
   292  	isCgroupV2 := isHostRunningCgroupV2(ctx, f, host)
   293  	if isCgroupV2 {
   294  		memCmd = "cat /sys/fs/cgroup/system.slice/node-problem-detector.service/memory.current && cat /sys/fs/cgroup/system.slice/node-problem-detector.service/memory.stat"
   295  	} else {
   296  		memCmd = "cat /sys/fs/cgroup/memory/system.slice/node-problem-detector.service/memory.usage_in_bytes && cat /sys/fs/cgroup/memory/system.slice/node-problem-detector.service/memory.stat"
   297  	}
   299  	result, err := e2essh.SSH(ctx, memCmd, host, framework.TestContext.Provider)
   300  	framework.ExpectNoError(err)
   301  	gomega.Expect(result.Code).To(gomega.Equal(0))
   302  	lines := strings.Split(result.Stdout, "\n")
   304  	memoryUsage, err := strconv.ParseFloat(lines[0], 64)
   305  	framework.ExpectNoError(err)
   307  	var rssToken, inactiveFileToken string
   308  	if isCgroupV2 {
   309  		// Use Anon memory for RSS as cAdvisor on cgroupv2
   310  		// see https://github.com/google/cadvisor/blob/a9858972e75642c2b1914c8d5428e33e6392c08a/container/libcontainer/handler.go#L799
   311  		rssToken = "anon"
   312  		inactiveFileToken = "inactive_file"
   313  	} else {
   314  		rssToken = "total_rss"
   315  		inactiveFileToken = "total_inactive_file"
   316  	}
   318  	var totalInactiveFile float64
   319  	for _, line := range lines[1:] {
   320  		tokens := strings.Split(line, " ")
   322  		if tokens[0] == rssToken {
   323  			rss, err = strconv.ParseFloat(tokens[1], 64)
   324  			framework.ExpectNoError(err)
   325  		}
   326  		if tokens[0] == inactiveFileToken {
   327  			totalInactiveFile, err = strconv.ParseFloat(tokens[1], 64)
   328  			framework.ExpectNoError(err)
   329  		}
   330  	}
   332  	workingSet = memoryUsage
   333  	if workingSet < totalInactiveFile {
   334  		workingSet = 0
   335  	} else {
   336  		workingSet -= totalInactiveFile
   337  	}
   339  	// Convert to MB
   340  	rss = rss / 1024 / 1024
   341  	workingSet = workingSet / 1024 / 1024
   342  	return
   343  }
   345  func getCPUStat(ctx context.Context, f *framework.Framework, host string) (usage, uptime float64) {
   346  	var cpuCmd string
   347  	if isHostRunningCgroupV2(ctx, f, host) {
   348  		cpuCmd = " cat /sys/fs/cgroup/cpu.stat | grep 'usage_usec' | sed 's/[^0-9]*//g' && cat /proc/uptime | awk '{print $1}'"
   349  	} else {
   350  		cpuCmd = "cat /sys/fs/cgroup/cpu/system.slice/node-problem-detector.service/cpuacct.usage && cat /proc/uptime | awk '{print $1}'"
   351  	}
   353  	result, err := e2essh.SSH(ctx, cpuCmd, host, framework.TestContext.Provider)
   354  	framework.ExpectNoError(err)
   355  	gomega.Expect(result.Code).To(gomega.Equal(0))
   356  	lines := strings.Split(result.Stdout, "\n")
   358  	usage, err = strconv.ParseFloat(lines[0], 64)
   359  	framework.ExpectNoError(err, "Cannot parse float for usage")
   360  	uptime, err = strconv.ParseFloat(lines[1], 64)
   361  	framework.ExpectNoError(err, "Cannot parse float for uptime")
   363  	// Convert from nanoseconds to seconds
   364  	usage *= 1e-9
   365  	return
   366  }
   368  func isHostRunningCgroupV2(ctx context.Context, f *framework.Framework, host string) bool {
   369  	result, err := e2essh.SSH(ctx, "stat -fc %T /sys/fs/cgroup/", host, framework.TestContext.Provider)
   370  	framework.ExpectNoError(err)
   371  	gomega.Expect(result.Code).To(gomega.Equal(0))
   373  	// 0x63677270 == CGROUP2_SUPER_MAGIC
   374  	// https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html
   375  	return strings.Contains(result.Stdout, "cgroup2") || strings.Contains(result.Stdout, "0x63677270")
   376  }
   378  func getNpdPodStat(ctx context.Context, f *framework.Framework, nodeName string) (cpuUsage, rss, workingSet float64) {
   379  	summary, err := e2ekubelet.GetStatsSummary(ctx, f.ClientSet, nodeName)
   380  	framework.ExpectNoError(err)
   382  	hasNpdPod := false
   383  	for _, pod := range summary.Pods {
   384  		if !strings.HasPrefix(pod.PodRef.Name, "node-problem-detector") {
   385  			continue
   386  		}
   387  		if pod.CPU != nil && pod.CPU.UsageNanoCores != nil {
   388  			cpuUsage = float64(*pod.CPU.UsageNanoCores) * 1e-9
   389  		}
   390  		if pod.Memory != nil {
   391  			if pod.Memory.RSSBytes != nil {
   392  				rss = float64(*pod.Memory.RSSBytes) / 1024 / 1024
   393  			}
   394  			if pod.Memory.WorkingSetBytes != nil {
   395  				workingSet = float64(*pod.Memory.WorkingSetBytes) / 1024 / 1024
   396  			}
   397  		}
   398  		hasNpdPod = true
   399  		break
   400  	}
   401  	if !hasNpdPod {
   402  		framework.Failf("No node-problem-detector pod is present in %+v", summary.Pods)
   403  	}
   404  	return
   405  }

