...

Source file src/k8s.io/kubernetes/test/e2e/node/node_problem_detector.go

Documentation: k8s.io/kubernetes/test/e2e/node

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package node
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"sort"
    24  	"strconv"
    25  	"strings"
    26  	"time"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/fields"
    31  	"k8s.io/kubernetes/test/e2e/framework"
    32  	e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
    33  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    34  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    35  	e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
    36  	"k8s.io/kubernetes/test/e2e/nodefeature"
    37  	testutils "k8s.io/kubernetes/test/utils"
    38  	admissionapi "k8s.io/pod-security-admission/api"
    39  
    40  	"github.com/onsi/ginkgo/v2"
    41  	"github.com/onsi/gomega"
    42  )
    43  
    44  // This test checks if node-problem-detector (NPD) runs fine without error on
    45  // the up to 10 nodes in the cluster. NPD's functionality is tested in e2e_node tests.
    46  var _ = SIGDescribe("NodeProblemDetector", nodefeature.NodeProblemDetector, func() {
    47  	const (
    48  		pollInterval      = 1 * time.Second
    49  		pollTimeout       = 1 * time.Minute
    50  		maxNodesToProcess = 10
    51  	)
    52  	f := framework.NewDefaultFramework("node-problem-detector")
    53  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    54  
    55  	ginkgo.BeforeEach(func(ctx context.Context) {
    56  		e2eskipper.SkipUnlessSSHKeyPresent()
    57  		e2eskipper.SkipUnlessProviderIs(framework.ProvidersWithSSH...)
    58  		e2eskipper.SkipUnlessProviderIs("gce", "gke")
    59  		e2eskipper.SkipUnlessNodeOSDistroIs("gci", "ubuntu")
    60  		e2enode.WaitForTotalHealthy(ctx, f.ClientSet, time.Minute)
    61  	})
    62  
    63  	ginkgo.It("should run without error", func(ctx context.Context) {
    64  		ginkgo.By("Getting all nodes and their SSH-able IP addresses")
    65  		readyNodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
    66  		framework.ExpectNoError(err)
    67  
    68  		nodes := []v1.Node{}
    69  		hosts := []string{}
    70  		for _, node := range readyNodes.Items {
    71  			host := ""
    72  			for _, addr := range node.Status.Addresses {
    73  				if addr.Type == v1.NodeExternalIP {
    74  					host = net.JoinHostPort(addr.Address, "22")
    75  					break
    76  				}
    77  			}
    78  			// Not every node has to have an external IP address.
    79  			if len(host) > 0 {
    80  				nodes = append(nodes, node)
    81  				hosts = append(hosts, host)
    82  			}
    83  		}
    84  
    85  		if len(nodes) == 0 {
    86  			ginkgo.Skip("Skipping test due to lack of ready nodes with public IP")
    87  		}
    88  
    89  		if len(nodes) > maxNodesToProcess {
    90  			nodes = nodes[:maxNodesToProcess]
    91  			hosts = hosts[:maxNodesToProcess]
    92  		}
    93  
    94  		isStandaloneMode := make(map[string]bool)
    95  		cpuUsageStats := make(map[string][]float64)
    96  		uptimeStats := make(map[string][]float64)
    97  		rssStats := make(map[string][]float64)
    98  		workingSetStats := make(map[string][]float64)
    99  
   100  		// Some tests suites running for days.
   101  		// This test is not marked as Disruptive or Serial so we do not want to
   102  		// restart the kubelet during the test to check for KubeletStart event
   103  		// detection. We use heuristic here to check if we need to validate for the
   104  		// KubeletStart event since there is no easy way to check when test has actually started.
   105  		checkForKubeletStart := false
   106  
   107  		for _, host := range hosts {
   108  			cpuUsageStats[host] = []float64{}
   109  			uptimeStats[host] = []float64{}
   110  			rssStats[host] = []float64{}
   111  			workingSetStats[host] = []float64{}
   112  
   113  			cmd := "systemctl status node-problem-detector.service"
   114  			result, err := e2essh.SSH(ctx, cmd, host, framework.TestContext.Provider)
   115  			isStandaloneMode[host] = (err == nil && result.Code == 0)
   116  
   117  			if isStandaloneMode[host] {
   118  				ginkgo.By(fmt.Sprintf("Check node %q has node-problem-detector process", host))
   119  				// Using brackets "[n]" is a trick to prevent grep command itself from
   120  				// showing up, because string text "[n]ode-problem-detector" does not
   121  				// match regular expression "[n]ode-problem-detector".
   122  				psCmd := "ps aux | grep [n]ode-problem-detector"
   123  				result, err = e2essh.SSH(ctx, psCmd, host, framework.TestContext.Provider)
   124  				framework.ExpectNoError(err)
   125  				gomega.Expect(result.Code).To(gomega.Equal(0))
   126  				gomega.Expect(result.Stdout).To(gomega.ContainSubstring("node-problem-detector"))
   127  
   128  				ginkgo.By(fmt.Sprintf("Check node-problem-detector is running fine on node %q", host))
   129  				journalctlCmd := "sudo journalctl -r -u node-problem-detector"
   130  				result, err = e2essh.SSH(ctx, journalctlCmd, host, framework.TestContext.Provider)
   131  				framework.ExpectNoError(err)
   132  				gomega.Expect(result.Code).To(gomega.Equal(0))
   133  				gomega.Expect(result.Stdout).NotTo(gomega.ContainSubstring("node-problem-detector.service: Failed"))
   134  
   135  				// We only will check for the KubeletStart even if parsing of date here succeeded.
   136  				ginkgo.By(fmt.Sprintf("Check when node-problem-detector started on node %q", host))
   137  				npdStartTimeCommand := "sudo systemctl show --timestamp=utc node-problem-detector -P ActiveEnterTimestamp"
   138  				result, err = e2essh.SSH(ctx, npdStartTimeCommand, host, framework.TestContext.Provider)
   139  				framework.ExpectNoError(err)
   140  				gomega.Expect(result.Code).To(gomega.Equal(0))
   141  
   142  				// The time format matches the systemd format.
   143  				// 'utc': 'Day YYYY-MM-DD HH:MM:SS UTC (see https://www.freedesktop.org/software/systemd/man/systemd.time.html)
   144  				st, err := time.Parse("Mon 2006-01-02 15:04:05 MST", result.Stdout)
   145  				if err != nil {
   146  					framework.Logf("Failed to parse when NPD started. Got exit code: %v and stdout: %v, error: %v. Will skip check for kubelet start event.", result.Code, result.Stdout, err)
   147  				} else {
   148  					checkForKubeletStart = time.Since(st) < time.Hour
   149  				}
   150  
   151  				cpuUsage, uptime := getCPUStat(ctx, f, host)
   152  				cpuUsageStats[host] = append(cpuUsageStats[host], cpuUsage)
   153  				uptimeStats[host] = append(uptimeStats[host], uptime)
   154  
   155  			}
   156  			ginkgo.By(fmt.Sprintf("Inject log to trigger DockerHung on node %q", host))
   157  			log := "INFO: task docker:12345 blocked for more than 120 seconds."
   158  			injectLogCmd := "sudo sh -c \"echo 'kernel: " + log + "' >> /dev/kmsg\""
   159  			result, err = e2essh.SSH(ctx, injectLogCmd, host, framework.TestContext.Provider)
   160  			framework.ExpectNoError(err)
   161  			gomega.Expect(result.Code).To(gomega.Equal(0))
   162  		}
   163  
   164  		ginkgo.By("Check node-problem-detector can post conditions and events to API server")
   165  		for _, node := range nodes {
   166  			ginkgo.By(fmt.Sprintf("Check node-problem-detector posted KernelDeadlock condition on node %q", node.Name))
   167  			gomega.Eventually(ctx, func() error {
   168  				return verifyNodeCondition(ctx, f, "KernelDeadlock", v1.ConditionTrue, "DockerHung", node.Name)
   169  			}, pollTimeout, pollInterval).Should(gomega.Succeed())
   170  
   171  			ginkgo.By(fmt.Sprintf("Check node-problem-detector posted DockerHung event on node %q", node.Name))
   172  			eventListOptions := metav1.ListOptions{FieldSelector: fields.Set{"involvedObject.kind": "Node"}.AsSelector().String()}
   173  			gomega.Eventually(ctx, func(ctx context.Context) error {
   174  				return verifyEvents(ctx, f, eventListOptions, 1, "DockerHung", node.Name)
   175  			}, pollTimeout, pollInterval).Should(gomega.Succeed())
   176  
   177  			if checkForKubeletStart {
   178  				// Node problem detector reports kubelet start events automatically starting from NPD v0.7.0+.
   179  				// Since Kubelet may be restarted for a few times after node is booted. We just check the event
   180  				// is detected, but do not check how many times Kubelet is started.
   181  				//
   182  				// Some test suites run for hours and KubeletStart event will already be cleaned up
   183  				ginkgo.By(fmt.Sprintf("Check node-problem-detector posted KubeletStart event on node %q", node.Name))
   184  				gomega.Eventually(ctx, func(ctx context.Context) error {
   185  					return verifyEventExists(ctx, f, eventListOptions, "KubeletStart", node.Name)
   186  				}, pollTimeout, pollInterval).Should(gomega.Succeed())
   187  			} else {
   188  				ginkgo.By("KubeletStart event will NOT be checked")
   189  			}
   190  		}
   191  
   192  		ginkgo.By("Gather node-problem-detector cpu and memory stats")
   193  		numIterations := 60
   194  		for i := 1; i <= numIterations; i++ {
   195  			for j, host := range hosts {
   196  				if isStandaloneMode[host] {
   197  					rss, workingSet := getMemoryStat(ctx, f, host)
   198  					rssStats[host] = append(rssStats[host], rss)
   199  					workingSetStats[host] = append(workingSetStats[host], workingSet)
   200  					if i == numIterations {
   201  						cpuUsage, uptime := getCPUStat(ctx, f, host)
   202  						cpuUsageStats[host] = append(cpuUsageStats[host], cpuUsage)
   203  						uptimeStats[host] = append(uptimeStats[host], uptime)
   204  					}
   205  				} else {
   206  					cpuUsage, rss, workingSet := getNpdPodStat(ctx, f, nodes[j].Name)
   207  					cpuUsageStats[host] = append(cpuUsageStats[host], cpuUsage)
   208  					rssStats[host] = append(rssStats[host], rss)
   209  					workingSetStats[host] = append(workingSetStats[host], workingSet)
   210  				}
   211  			}
   212  			time.Sleep(time.Second)
   213  		}
   214  
   215  		cpuStatsMsg := "CPU (core):"
   216  		rssStatsMsg := "RSS (MB):"
   217  		workingSetStatsMsg := "WorkingSet (MB):"
   218  		for i, host := range hosts {
   219  			if isStandaloneMode[host] {
   220  				// When in standalone mode, NPD is running as systemd service. We
   221  				// calculate its cpu usage from cgroup cpuacct value differences.
   222  				cpuUsage := cpuUsageStats[host][1] - cpuUsageStats[host][0]
   223  				totaltime := uptimeStats[host][1] - uptimeStats[host][0]
   224  				cpuStatsMsg += fmt.Sprintf(" %s[%.3f];", nodes[i].Name, cpuUsage/totaltime)
   225  			} else {
   226  				sort.Float64s(cpuUsageStats[host])
   227  				cpuStatsMsg += fmt.Sprintf(" %s[%.3f|%.3f|%.3f];", nodes[i].Name,
   228  					cpuUsageStats[host][0], cpuUsageStats[host][len(cpuUsageStats[host])/2], cpuUsageStats[host][len(cpuUsageStats[host])-1])
   229  			}
   230  
   231  			sort.Float64s(rssStats[host])
   232  			rssStatsMsg += fmt.Sprintf(" %s[%.1f|%.1f|%.1f];", nodes[i].Name,
   233  				rssStats[host][0], rssStats[host][len(rssStats[host])/2], rssStats[host][len(rssStats[host])-1])
   234  
   235  			sort.Float64s(workingSetStats[host])
   236  			workingSetStatsMsg += fmt.Sprintf(" %s[%.1f|%.1f|%.1f];", nodes[i].Name,
   237  				workingSetStats[host][0], workingSetStats[host][len(workingSetStats[host])/2], workingSetStats[host][len(workingSetStats[host])-1])
   238  		}
   239  		framework.Logf("Node-Problem-Detector CPU and Memory Stats:\n\t%s\n\t%s\n\t%s", cpuStatsMsg, rssStatsMsg, workingSetStatsMsg)
   240  	})
   241  })
   242  
   243  func verifyEvents(ctx context.Context, f *framework.Framework, options metav1.ListOptions, num int, reason, nodeName string) error {
   244  	events, err := f.ClientSet.CoreV1().Events(metav1.NamespaceDefault).List(ctx, options)
   245  	if err != nil {
   246  		return err
   247  	}
   248  	count := 0
   249  	for _, event := range events.Items {
   250  		if event.Reason != reason || event.Source.Host != nodeName {
   251  			continue
   252  		}
   253  		count += int(event.Count)
   254  	}
   255  	if count != num {
   256  		return fmt.Errorf("expect event number %d, got %d: %v", num, count, events.Items)
   257  	}
   258  	return nil
   259  }
   260  
   261  func verifyEventExists(ctx context.Context, f *framework.Framework, options metav1.ListOptions, reason, nodeName string) error {
   262  	events, err := f.ClientSet.CoreV1().Events(metav1.NamespaceDefault).List(ctx, options)
   263  	if err != nil {
   264  		return err
   265  	}
   266  	for _, event := range events.Items {
   267  		if event.Reason == reason && event.Source.Host == nodeName && event.Count > 0 {
   268  			return nil
   269  		}
   270  	}
   271  	return fmt.Errorf("Event %s does not exist: %v", reason, events.Items)
   272  }
   273  
   274  func verifyNodeCondition(ctx context.Context, f *framework.Framework, condition v1.NodeConditionType, status v1.ConditionStatus, reason, nodeName string) error {
   275  	node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
   276  	if err != nil {
   277  		return err
   278  	}
   279  	_, c := testutils.GetNodeCondition(&node.Status, condition)
   280  	if c == nil {
   281  		return fmt.Errorf("node condition %q not found", condition)
   282  	}
   283  	if c.Status != status || c.Reason != reason {
   284  		return fmt.Errorf("unexpected node condition %q: %+v", condition, c)
   285  	}
   286  	return nil
   287  }
   288  
   289  func getMemoryStat(ctx context.Context, f *framework.Framework, host string) (rss, workingSet float64) {
   290  	var memCmd string
   291  
   292  	isCgroupV2 := isHostRunningCgroupV2(ctx, f, host)
   293  	if isCgroupV2 {
   294  		memCmd = "cat /sys/fs/cgroup/system.slice/node-problem-detector.service/memory.current && cat /sys/fs/cgroup/system.slice/node-problem-detector.service/memory.stat"
   295  	} else {
   296  		memCmd = "cat /sys/fs/cgroup/memory/system.slice/node-problem-detector.service/memory.usage_in_bytes && cat /sys/fs/cgroup/memory/system.slice/node-problem-detector.service/memory.stat"
   297  	}
   298  
   299  	result, err := e2essh.SSH(ctx, memCmd, host, framework.TestContext.Provider)
   300  	framework.ExpectNoError(err)
   301  	gomega.Expect(result.Code).To(gomega.Equal(0))
   302  	lines := strings.Split(result.Stdout, "\n")
   303  
   304  	memoryUsage, err := strconv.ParseFloat(lines[0], 64)
   305  	framework.ExpectNoError(err)
   306  
   307  	var rssToken, inactiveFileToken string
   308  	if isCgroupV2 {
   309  		// Use Anon memory for RSS as cAdvisor on cgroupv2
   310  		// see https://github.com/google/cadvisor/blob/a9858972e75642c2b1914c8d5428e33e6392c08a/container/libcontainer/handler.go#L799
   311  		rssToken = "anon"
   312  		inactiveFileToken = "inactive_file"
   313  	} else {
   314  		rssToken = "total_rss"
   315  		inactiveFileToken = "total_inactive_file"
   316  	}
   317  
   318  	var totalInactiveFile float64
   319  	for _, line := range lines[1:] {
   320  		tokens := strings.Split(line, " ")
   321  
   322  		if tokens[0] == rssToken {
   323  			rss, err = strconv.ParseFloat(tokens[1], 64)
   324  			framework.ExpectNoError(err)
   325  		}
   326  		if tokens[0] == inactiveFileToken {
   327  			totalInactiveFile, err = strconv.ParseFloat(tokens[1], 64)
   328  			framework.ExpectNoError(err)
   329  		}
   330  	}
   331  
   332  	workingSet = memoryUsage
   333  	if workingSet < totalInactiveFile {
   334  		workingSet = 0
   335  	} else {
   336  		workingSet -= totalInactiveFile
   337  	}
   338  
   339  	// Convert to MB
   340  	rss = rss / 1024 / 1024
   341  	workingSet = workingSet / 1024 / 1024
   342  	return
   343  }
   344  
   345  func getCPUStat(ctx context.Context, f *framework.Framework, host string) (usage, uptime float64) {
   346  	var cpuCmd string
   347  	if isHostRunningCgroupV2(ctx, f, host) {
   348  		cpuCmd = " cat /sys/fs/cgroup/cpu.stat | grep 'usage_usec' | sed 's/[^0-9]*//g' && cat /proc/uptime | awk '{print $1}'"
   349  	} else {
   350  		cpuCmd = "cat /sys/fs/cgroup/cpu/system.slice/node-problem-detector.service/cpuacct.usage && cat /proc/uptime | awk '{print $1}'"
   351  	}
   352  
   353  	result, err := e2essh.SSH(ctx, cpuCmd, host, framework.TestContext.Provider)
   354  	framework.ExpectNoError(err)
   355  	gomega.Expect(result.Code).To(gomega.Equal(0))
   356  	lines := strings.Split(result.Stdout, "\n")
   357  
   358  	usage, err = strconv.ParseFloat(lines[0], 64)
   359  	framework.ExpectNoError(err, "Cannot parse float for usage")
   360  	uptime, err = strconv.ParseFloat(lines[1], 64)
   361  	framework.ExpectNoError(err, "Cannot parse float for uptime")
   362  
   363  	// Convert from nanoseconds to seconds
   364  	usage *= 1e-9
   365  	return
   366  }
   367  
   368  func isHostRunningCgroupV2(ctx context.Context, f *framework.Framework, host string) bool {
   369  	result, err := e2essh.SSH(ctx, "stat -fc %T /sys/fs/cgroup/", host, framework.TestContext.Provider)
   370  	framework.ExpectNoError(err)
   371  	gomega.Expect(result.Code).To(gomega.Equal(0))
   372  
   373  	// 0x63677270 == CGROUP2_SUPER_MAGIC
   374  	// https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html
   375  	return strings.Contains(result.Stdout, "cgroup2") || strings.Contains(result.Stdout, "0x63677270")
   376  }
   377  
   378  func getNpdPodStat(ctx context.Context, f *framework.Framework, nodeName string) (cpuUsage, rss, workingSet float64) {
   379  	summary, err := e2ekubelet.GetStatsSummary(ctx, f.ClientSet, nodeName)
   380  	framework.ExpectNoError(err)
   381  
   382  	hasNpdPod := false
   383  	for _, pod := range summary.Pods {
   384  		if !strings.HasPrefix(pod.PodRef.Name, "node-problem-detector") {
   385  			continue
   386  		}
   387  		if pod.CPU != nil && pod.CPU.UsageNanoCores != nil {
   388  			cpuUsage = float64(*pod.CPU.UsageNanoCores) * 1e-9
   389  		}
   390  		if pod.Memory != nil {
   391  			if pod.Memory.RSSBytes != nil {
   392  				rss = float64(*pod.Memory.RSSBytes) / 1024 / 1024
   393  			}
   394  			if pod.Memory.WorkingSetBytes != nil {
   395  				workingSet = float64(*pod.Memory.WorkingSetBytes) / 1024 / 1024
   396  			}
   397  		}
   398  		hasNpdPod = true
   399  		break
   400  	}
   401  	if !hasNpdPod {
   402  		framework.Failf("No node-problem-detector pod is present in %+v", summary.Pods)
   403  	}
   404  	return
   405  }
   406  

View as plain text