...

Source file src/k8s.io/kubernetes/test/e2e_node/util.go

Documentation: k8s.io/kubernetes/test/e2e_node

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package e2enode
    18  
    19  import (
    20  	"context"
    21  	"crypto/tls"
    22  	"encoding/json"
    23  	"flag"
    24  	"fmt"
    25  	"io"
    26  	"net"
    27  	"net/http"
    28  	"os"
    29  	"os/exec"
    30  	"regexp"
    31  	"strconv"
    32  	"strings"
    33  	"time"
    34  
    35  	"k8s.io/kubernetes/pkg/util/procfs"
    36  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    37  
    38  	oteltrace "go.opentelemetry.io/otel/trace"
    39  
    40  	v1 "k8s.io/api/core/v1"
    41  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    42  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    43  	"k8s.io/apimachinery/pkg/util/runtime"
    44  	"k8s.io/apimachinery/pkg/util/sets"
    45  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    46  	clientset "k8s.io/client-go/kubernetes"
    47  	"k8s.io/component-base/featuregate"
    48  	internalapi "k8s.io/cri-api/pkg/apis"
    49  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    50  	"k8s.io/klog/v2"
    51  	kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
    52  	kubeletpodresourcesv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
    53  	stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
    54  	"k8s.io/kubelet/pkg/types"
    55  	"k8s.io/kubernetes/pkg/cluster/ports"
    56  	kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
    57  	"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
    58  	"k8s.io/kubernetes/pkg/kubelet/cm"
    59  	"k8s.io/kubernetes/pkg/kubelet/cri/remote"
    60  	kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
    61  	"k8s.io/kubernetes/pkg/kubelet/util"
    62  
    63  	"github.com/coreos/go-systemd/v22/dbus"
    64  	"k8s.io/kubernetes/test/e2e/framework"
    65  	e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
    66  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    67  	e2enodekubelet "k8s.io/kubernetes/test/e2e_node/kubeletconfig"
    68  	imageutils "k8s.io/kubernetes/test/utils/image"
    69  
    70  	"github.com/onsi/ginkgo/v2"
    71  	"github.com/onsi/gomega"
    72  )
    73  
    74  var startServices = flag.Bool("start-services", true, "If true, start local node services")
    75  var stopServices = flag.Bool("stop-services", true, "If true, stop local node services after running tests")
    76  var busyboxImage = imageutils.GetE2EImage(imageutils.BusyBox)
    77  var agnhostImage = imageutils.GetE2EImage(imageutils.Agnhost)
    78  
    79  const (
    80  	// Kubelet internal cgroup name for node allocatable cgroup.
    81  	defaultNodeAllocatableCgroup = "kubepods"
    82  	// defaultPodResourcesPath is the path to the local endpoint serving the podresources GRPC service.
    83  	defaultPodResourcesPath    = "/var/lib/kubelet/pod-resources"
    84  	defaultPodResourcesTimeout = 10 * time.Second
    85  	defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb
    86  	// state files
    87  	cpuManagerStateFile    = "/var/lib/kubelet/cpu_manager_state"
    88  	memoryManagerStateFile = "/var/lib/kubelet/memory_manager_state"
    89  )
    90  
    91  var (
    92  	kubeletHealthCheckURL    = fmt.Sprintf("http://127.0.0.1:%d/healthz", ports.KubeletHealthzPort)
    93  	containerRuntimeUnitName = ""
    94  	// KubeletConfig is the kubelet configuration the test is running against.
    95  	kubeletCfg *kubeletconfig.KubeletConfiguration
    96  )
    97  
    98  func getNodeSummary(ctx context.Context) (*stats.Summary, error) {
    99  	kubeletConfig, err := getCurrentKubeletConfig(ctx)
   100  	if err != nil {
   101  		return nil, fmt.Errorf("failed to get current kubelet config")
   102  	}
   103  	req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s/stats/summary", net.JoinHostPort(kubeletConfig.Address, strconv.Itoa(int(kubeletConfig.ReadOnlyPort)))), nil)
   104  	if err != nil {
   105  		return nil, fmt.Errorf("failed to build http request: %w", err)
   106  	}
   107  	req.Header.Add("Accept", "application/json")
   108  
   109  	client := &http.Client{}
   110  	resp, err := client.Do(req)
   111  	if err != nil {
   112  		return nil, fmt.Errorf("failed to get /stats/summary: %w", err)
   113  	}
   114  
   115  	defer resp.Body.Close()
   116  	contentsBytes, err := io.ReadAll(resp.Body)
   117  	if err != nil {
   118  		return nil, fmt.Errorf("failed to read /stats/summary: %+v", resp)
   119  	}
   120  
   121  	decoder := json.NewDecoder(strings.NewReader(string(contentsBytes)))
   122  	summary := stats.Summary{}
   123  	err = decoder.Decode(&summary)
   124  	if err != nil {
   125  		return nil, fmt.Errorf("failed to parse /stats/summary to go struct: %+v", resp)
   126  	}
   127  	return &summary, nil
   128  }
   129  
   130  func getV1alpha1NodeDevices(ctx context.Context) (*kubeletpodresourcesv1alpha1.ListPodResourcesResponse, error) {
   131  	endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
   132  	if err != nil {
   133  		return nil, fmt.Errorf("Error getting local endpoint: %w", err)
   134  	}
   135  	client, conn, err := podresources.GetV1alpha1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
   136  	if err != nil {
   137  		return nil, fmt.Errorf("Error getting grpc client: %w", err)
   138  	}
   139  	defer conn.Close()
   140  	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
   141  	defer cancel()
   142  	resp, err := client.List(ctx, &kubeletpodresourcesv1alpha1.ListPodResourcesRequest{})
   143  	if err != nil {
   144  		return nil, fmt.Errorf("%v.Get(_) = _, %v", client, err)
   145  	}
   146  	return resp, nil
   147  }
   148  
   149  func getV1NodeDevices(ctx context.Context) (*kubeletpodresourcesv1.ListPodResourcesResponse, error) {
   150  	endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
   151  	if err != nil {
   152  		return nil, fmt.Errorf("Error getting local endpoint: %w", err)
   153  	}
   154  	client, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
   155  	if err != nil {
   156  		return nil, fmt.Errorf("Error getting gRPC client: %w", err)
   157  	}
   158  	defer conn.Close()
   159  	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
   160  	defer cancel()
   161  	resp, err := client.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{})
   162  	if err != nil {
   163  		return nil, fmt.Errorf("%v.Get(_) = _, %v", client, err)
   164  	}
   165  	return resp, nil
   166  }
   167  
   168  // Returns the current KubeletConfiguration
   169  func getCurrentKubeletConfig(ctx context.Context) (*kubeletconfig.KubeletConfiguration, error) {
   170  	// namespace only relevant if useProxy==true, so we don't bother
   171  	return e2enodekubelet.GetCurrentKubeletConfig(ctx, framework.TestContext.NodeName, "", false, framework.TestContext.StandaloneMode)
   172  }
   173  
   174  func cleanupPods(f *framework.Framework) {
   175  	ginkgo.AfterEach(func(ctx context.Context) {
   176  		ginkgo.By("Deleting any Pods created by the test in namespace: " + f.Namespace.Name)
   177  		l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{})
   178  		framework.ExpectNoError(err)
   179  		for _, p := range l.Items {
   180  			if p.Namespace != f.Namespace.Name {
   181  				continue
   182  			}
   183  			framework.Logf("Deleting pod: %s", p.Name)
   184  			e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute)
   185  		}
   186  	})
   187  }
   188  
   189  // Must be called within a Context. Allows the function to modify the KubeletConfiguration during the BeforeEach of the context.
   190  // The change is reverted in the AfterEach of the context.
   191  // Returns true on success.
   192  func tempSetCurrentKubeletConfig(f *framework.Framework, updateFunction func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration)) {
   193  	var oldCfg *kubeletconfig.KubeletConfiguration
   194  
   195  	ginkgo.BeforeEach(func(ctx context.Context) {
   196  		var err error
   197  		oldCfg, err = getCurrentKubeletConfig(ctx)
   198  		framework.ExpectNoError(err)
   199  
   200  		newCfg := oldCfg.DeepCopy()
   201  		updateFunction(ctx, newCfg)
   202  		if apiequality.Semantic.DeepEqual(*newCfg, *oldCfg) {
   203  			return
   204  		}
   205  
   206  		updateKubeletConfig(ctx, f, newCfg, true)
   207  	})
   208  
   209  	ginkgo.AfterEach(func(ctx context.Context) {
   210  		if oldCfg != nil {
   211  			// Update the Kubelet configuration.
   212  			updateKubeletConfig(ctx, f, oldCfg, true)
   213  		}
   214  	})
   215  }
   216  
   217  func updateKubeletConfig(ctx context.Context, f *framework.Framework, kubeletConfig *kubeletconfig.KubeletConfiguration, deleteStateFiles bool) {
   218  	// Update the Kubelet configuration.
   219  	ginkgo.By("Stopping the kubelet")
   220  	startKubelet := stopKubelet()
   221  
   222  	// wait until the kubelet health check will fail
   223  	gomega.Eventually(ctx, func() bool {
   224  		return kubeletHealthCheck(kubeletHealthCheckURL)
   225  	}, time.Minute, time.Second).Should(gomega.BeFalse())
   226  
   227  	// Delete CPU and memory manager state files to be sure it will not prevent the kubelet restart
   228  	if deleteStateFiles {
   229  		deleteStateFile(cpuManagerStateFile)
   230  		deleteStateFile(memoryManagerStateFile)
   231  	}
   232  
   233  	framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(kubeletConfig))
   234  
   235  	ginkgo.By("Starting the kubelet")
   236  	startKubelet()
   237  	waitForKubeletToStart(ctx, f)
   238  }
   239  
   240  func waitForKubeletToStart(ctx context.Context, f *framework.Framework) {
   241  	// wait until the kubelet health check will succeed
   242  	gomega.Eventually(ctx, func() bool {
   243  		return kubeletHealthCheck(kubeletHealthCheckURL)
   244  	}, 2*time.Minute, 5*time.Second).Should(gomega.BeTrue())
   245  
   246  	// Wait for the Kubelet to be ready.
   247  	gomega.Eventually(ctx, func(ctx context.Context) bool {
   248  		nodes, err := e2enode.TotalReady(ctx, f.ClientSet)
   249  		framework.ExpectNoError(err)
   250  		return nodes == 1
   251  	}, time.Minute, time.Second).Should(gomega.BeTrue())
   252  }
   253  
   254  func deleteStateFile(stateFileName string) {
   255  	err := exec.Command("/bin/sh", "-c", fmt.Sprintf("rm -f %s", stateFileName)).Run()
   256  	framework.ExpectNoError(err, "failed to delete the state file")
   257  }
   258  
   259  // listNamespaceEvents lists the events in the given namespace.
   260  func listNamespaceEvents(ctx context.Context, c clientset.Interface, ns string) error {
   261  	ls, err := c.CoreV1().Events(ns).List(ctx, metav1.ListOptions{})
   262  	if err != nil {
   263  		return err
   264  	}
   265  	for _, event := range ls.Items {
   266  		klog.Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message)
   267  	}
   268  	return nil
   269  }
   270  
   271  func logPodEvents(ctx context.Context, f *framework.Framework) {
   272  	framework.Logf("Summary of pod events during the test:")
   273  	err := listNamespaceEvents(ctx, f.ClientSet, f.Namespace.Name)
   274  	framework.ExpectNoError(err)
   275  }
   276  
   277  func logNodeEvents(ctx context.Context, f *framework.Framework) {
   278  	framework.Logf("Summary of node events during the test:")
   279  	err := listNamespaceEvents(ctx, f.ClientSet, "")
   280  	framework.ExpectNoError(err)
   281  }
   282  
   283  func getLocalNode(ctx context.Context, f *framework.Framework) *v1.Node {
   284  	nodeList, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
   285  	framework.ExpectNoError(err)
   286  	gomega.Expect(nodeList.Items).Should(gomega.HaveLen(1), "Unexpected number of node objects for node e2e. Expects only one node.")
   287  	return &nodeList.Items[0]
   288  }
   289  
   290  // getLocalTestNode fetches the node object describing the local worker node set up by the e2e_node infra, alongside with its ready state.
   291  // getLocalTestNode is a variant of `getLocalNode` which reports but does not set any requirement about the node readiness state, letting
   292  // the caller decide. The check is intentionally done like `getLocalNode` does.
   293  // Note `getLocalNode` aborts (as in ginkgo.Expect) the test implicitly if the worker node is not ready.
   294  func getLocalTestNode(ctx context.Context, f *framework.Framework) (*v1.Node, bool) {
   295  	node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, framework.TestContext.NodeName, metav1.GetOptions{})
   296  	framework.ExpectNoError(err)
   297  	ready := e2enode.IsNodeReady(node)
   298  	schedulable := e2enode.IsNodeSchedulable(node)
   299  	framework.Logf("node %q ready=%v schedulable=%v", node.Name, ready, schedulable)
   300  	return node, ready && schedulable
   301  }
   302  
   303  // logKubeletLatencyMetrics logs KubeletLatencyMetrics computed from the Prometheus
   304  // metrics exposed on the current node and identified by the metricNames.
   305  // The Kubelet subsystem prefix is automatically prepended to these metric names.
   306  func logKubeletLatencyMetrics(ctx context.Context, metricNames ...string) {
   307  	metricSet := sets.NewString()
   308  	for _, key := range metricNames {
   309  		metricSet.Insert(kubeletmetrics.KubeletSubsystem + "_" + key)
   310  	}
   311  	metric, err := e2emetrics.GrabKubeletMetricsWithoutProxy(ctx, fmt.Sprintf("%s:%d", nodeNameOrIP(), ports.KubeletReadOnlyPort), "/metrics")
   312  	if err != nil {
   313  		framework.Logf("Error getting kubelet metrics: %v", err)
   314  	} else {
   315  		framework.Logf("Kubelet Metrics: %+v", e2emetrics.GetKubeletLatencyMetrics(metric, metricSet))
   316  	}
   317  }
   318  
   319  // getCRIClient connects CRI and returns CRI runtime service clients and image service client.
   320  func getCRIClient() (internalapi.RuntimeService, internalapi.ImageManagerService, error) {
   321  	// connection timeout for CRI service connection
   322  	const connectionTimeout = 2 * time.Minute
   323  	runtimeEndpoint := framework.TestContext.ContainerRuntimeEndpoint
   324  	r, err := remote.NewRemoteRuntimeService(runtimeEndpoint, connectionTimeout, oteltrace.NewNoopTracerProvider())
   325  	if err != nil {
   326  		return nil, nil, err
   327  	}
   328  	imageManagerEndpoint := runtimeEndpoint
   329  	if framework.TestContext.ImageServiceEndpoint != "" {
   330  		//ImageServiceEndpoint is the same as ContainerRuntimeEndpoint if not
   331  		//explicitly specified
   332  		imageManagerEndpoint = framework.TestContext.ImageServiceEndpoint
   333  	}
   334  	i, err := remote.NewRemoteImageService(imageManagerEndpoint, connectionTimeout, oteltrace.NewNoopTracerProvider())
   335  	if err != nil {
   336  		return nil, nil, err
   337  	}
   338  	return r, i, nil
   339  }
   340  
   341  // findKubeletServiceName searches the unit name among the services known to systemd.
   342  // if the `running` parameter is true, restricts the search among currently running services;
   343  // otherwise, also stopped, failed, exited (non-running in general) services are also considered.
   344  // TODO: Find a uniform way to deal with systemctl/initctl/service operations. #34494
   345  func findKubeletServiceName(running bool) string {
   346  	cmdLine := []string{
   347  		"systemctl", "list-units", "*kubelet*",
   348  	}
   349  	if running {
   350  		cmdLine = append(cmdLine, "--state=running")
   351  	}
   352  	stdout, err := exec.Command("sudo", cmdLine...).CombinedOutput()
   353  	framework.ExpectNoError(err)
   354  	regex := regexp.MustCompile("(kubelet-\\w+)")
   355  	matches := regex.FindStringSubmatch(string(stdout))
   356  	gomega.Expect(matches).ToNot(gomega.BeEmpty(), "Found more than one kubelet service running: %q", stdout)
   357  	kubeletServiceName := matches[0]
   358  	framework.Logf("Get running kubelet with systemctl: %v, %v", string(stdout), kubeletServiceName)
   359  	return kubeletServiceName
   360  }
   361  
   362  func findContainerRuntimeServiceName() (string, error) {
   363  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   364  	defer cancel()
   365  
   366  	conn, err := dbus.NewWithContext(ctx)
   367  	framework.ExpectNoError(err, "Failed to setup dbus connection")
   368  	defer conn.Close()
   369  
   370  	runtimePids, err := getPidsForProcess(framework.TestContext.ContainerRuntimeProcessName, framework.TestContext.ContainerRuntimePidFile)
   371  	framework.ExpectNoError(err, "failed to get list of container runtime pids")
   372  	gomega.Expect(runtimePids).To(gomega.HaveLen(1), "Unexpected number of container runtime pids. Expected 1 but got %v", len(runtimePids))
   373  
   374  	containerRuntimePid := runtimePids[0]
   375  
   376  	unitName, err := conn.GetUnitNameByPID(ctx, uint32(containerRuntimePid))
   377  	framework.ExpectNoError(err, "Failed to get container runtime unit name")
   378  
   379  	return unitName, nil
   380  }
   381  
   382  type containerRuntimeUnitOp int
   383  
   384  const (
   385  	startContainerRuntimeUnitOp containerRuntimeUnitOp = iota
   386  	stopContainerRuntimeUnitOp
   387  )
   388  
   389  func performContainerRuntimeUnitOp(op containerRuntimeUnitOp) error {
   390  	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
   391  	defer cancel()
   392  
   393  	conn, err := dbus.NewWithContext(ctx)
   394  	framework.ExpectNoError(err, "Failed to setup dbus connection")
   395  	defer conn.Close()
   396  
   397  	if containerRuntimeUnitName == "" {
   398  		containerRuntimeUnitName, err = findContainerRuntimeServiceName()
   399  		framework.ExpectNoError(err, "Failed to find container runtime name")
   400  	}
   401  
   402  	reschan := make(chan string)
   403  
   404  	switch op {
   405  	case startContainerRuntimeUnitOp:
   406  		_, err = conn.StartUnitContext(ctx, containerRuntimeUnitName, "replace", reschan)
   407  	case stopContainerRuntimeUnitOp:
   408  		_, err = conn.StopUnitContext(ctx, containerRuntimeUnitName, "replace", reschan)
   409  	default:
   410  		framework.Failf("Unexpected container runtime op: %v", op)
   411  	}
   412  	framework.ExpectNoError(err, "dbus connection error")
   413  
   414  	job := <-reschan
   415  	gomega.Expect(job).To(gomega.Equal("done"), "Expected job to complete with done")
   416  
   417  	return nil
   418  }
   419  
   420  func stopContainerRuntime() error {
   421  	return performContainerRuntimeUnitOp(stopContainerRuntimeUnitOp)
   422  }
   423  
   424  func startContainerRuntime() error {
   425  	return performContainerRuntimeUnitOp(startContainerRuntimeUnitOp)
   426  }
   427  
   428  // restartKubelet restarts the current kubelet service.
   429  // the "current" kubelet service is the instance managed by the current e2e_node test run.
   430  // If `running` is true, restarts only if the current kubelet is actually running. In some cases,
   431  // the kubelet may have exited or can be stopped, typically because it was intentionally stopped
   432  // earlier during a test, or, sometimes, because it just crashed.
   433  // Warning: the "current" kubelet is poorly defined. The "current" kubelet is assumed to be the most
   434  // recent kubelet service unit, IOW there is not a unique ID we use to bind explicitly a kubelet
   435  // instance to a test run.
   436  func restartKubelet(running bool) {
   437  	kubeletServiceName := findKubeletServiceName(running)
   438  	// reset the kubelet service start-limit-hit
   439  	stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput()
   440  	framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %s", err, string(stdout))
   441  
   442  	stdout, err = exec.Command("sudo", "systemctl", "restart", kubeletServiceName).CombinedOutput()
   443  	framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %s", err, string(stdout))
   444  }
   445  
   446  // stopKubelet will kill the running kubelet, and returns a func that will restart the process again
   447  func stopKubelet() func() {
   448  	kubeletServiceName := findKubeletServiceName(true)
   449  
   450  	// reset the kubelet service start-limit-hit
   451  	stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput()
   452  	framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %s", err, string(stdout))
   453  
   454  	stdout, err = exec.Command("sudo", "systemctl", "kill", kubeletServiceName).CombinedOutput()
   455  	framework.ExpectNoError(err, "Failed to stop kubelet with systemctl: %v, %s", err, string(stdout))
   456  
   457  	return func() {
   458  		// we should restart service, otherwise the transient service start will fail
   459  		stdout, err := exec.Command("sudo", "systemctl", "restart", kubeletServiceName).CombinedOutput()
   460  		framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %v", err, stdout)
   461  	}
   462  }
   463  
   464  // killKubelet sends a signal (SIGINT, SIGSTOP, SIGTERM...) to the running kubelet
   465  func killKubelet(sig string) {
   466  	kubeletServiceName := findKubeletServiceName(true)
   467  
   468  	// reset the kubelet service start-limit-hit
   469  	stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput()
   470  	framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %v", err, stdout)
   471  
   472  	stdout, err = exec.Command("sudo", "systemctl", "kill", "-s", sig, kubeletServiceName).CombinedOutput()
   473  	framework.ExpectNoError(err, "Failed to stop kubelet with systemctl: %v, %v", err, stdout)
   474  }
   475  
   476  func kubeletHealthCheck(url string) bool {
   477  	insecureTransport := http.DefaultTransport.(*http.Transport).Clone()
   478  	insecureTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
   479  	insecureHTTPClient := &http.Client{
   480  		Transport: insecureTransport,
   481  	}
   482  
   483  	req, err := http.NewRequest("HEAD", url, nil)
   484  	if err != nil {
   485  		return false
   486  	}
   487  	req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", framework.TestContext.BearerToken))
   488  	resp, err := insecureHTTPClient.Do(req)
   489  	if err != nil {
   490  		klog.Warningf("Health check on %q failed, error=%v", url, err)
   491  	} else if resp.StatusCode != http.StatusOK {
   492  		klog.Warningf("Health check on %q failed, status=%d", url, resp.StatusCode)
   493  	}
   494  	return err == nil && resp.StatusCode == http.StatusOK
   495  }
   496  
   497  func toCgroupFsName(cgroupName cm.CgroupName) string {
   498  	if kubeletCfg.CgroupDriver == "systemd" {
   499  		return cgroupName.ToSystemd()
   500  	}
   501  	return cgroupName.ToCgroupfs()
   502  }
   503  
   504  // reduceAllocatableMemoryUsageIfCgroupv1 uses memory.force_empty (https://lwn.net/Articles/432224/)
   505  // to make the kernel reclaim memory in the allocatable cgroup
   506  // the time to reduce pressure may be unbounded, but usually finishes within a second.
   507  // memory.force_empty is no supported in cgroupv2.
   508  func reduceAllocatableMemoryUsageIfCgroupv1() {
   509  	if !IsCgroup2UnifiedMode() {
   510  		cmd := fmt.Sprintf("echo 0 > /sys/fs/cgroup/memory/%s/memory.force_empty", toCgroupFsName(cm.NewCgroupName(cm.RootCgroupName, defaultNodeAllocatableCgroup)))
   511  		_, err := exec.Command("sudo", "sh", "-c", cmd).CombinedOutput()
   512  		framework.ExpectNoError(err)
   513  	}
   514  }
   515  
   516  // Equivalent of featuregatetesting.SetFeatureGateDuringTest
   517  // which can't be used here because we're not in a Testing context.
   518  // This must be in a non-"_test" file to pass
   519  // make verify WHAT=test-featuregates
   520  func withFeatureGate(feature featuregate.Feature, desired bool) func() {
   521  	current := utilfeature.DefaultFeatureGate.Enabled(feature)
   522  	utilfeature.DefaultMutableFeatureGate.Set(fmt.Sprintf("%s=%v", string(feature), desired))
   523  	return func() {
   524  		utilfeature.DefaultMutableFeatureGate.Set(fmt.Sprintf("%s=%v", string(feature), current))
   525  	}
   526  }
   527  
   528  // waitForAllContainerRemoval waits until all the containers on a given pod are really gone.
   529  // This is needed by the e2e tests which involve exclusive resource allocation (cpu, topology manager; podresources; etc.)
   530  // In these cases, we need to make sure the tests clean up after themselves to make sure each test runs in
   531  // a pristine environment. The only way known so far to do that is to introduce this wait.
   532  // Worth noting, however, that this makes the test runtime much bigger.
   533  func waitForAllContainerRemoval(ctx context.Context, podName, podNS string) {
   534  	rs, _, err := getCRIClient()
   535  	framework.ExpectNoError(err)
   536  	gomega.Eventually(ctx, func(ctx context.Context) error {
   537  		containers, err := rs.ListContainers(ctx, &runtimeapi.ContainerFilter{
   538  			LabelSelector: map[string]string{
   539  				types.KubernetesPodNameLabel:      podName,
   540  				types.KubernetesPodNamespaceLabel: podNS,
   541  			},
   542  		})
   543  		if err != nil {
   544  			return fmt.Errorf("got error waiting for all containers to be removed from CRI: %v", err)
   545  		}
   546  
   547  		if len(containers) > 0 {
   548  			return fmt.Errorf("expected all containers to be removed from CRI but %v containers still remain. Containers: %+v", len(containers), containers)
   549  		}
   550  		return nil
   551  	}, 2*time.Minute, 1*time.Second).Should(gomega.Succeed())
   552  }
   553  
   554  func getPidsForProcess(name, pidFile string) ([]int, error) {
   555  	if len(pidFile) > 0 {
   556  		pid, err := getPidFromPidFile(pidFile)
   557  		if err == nil {
   558  			return []int{pid}, nil
   559  		}
   560  		// log the error and fall back to pidof
   561  		runtime.HandleError(err)
   562  	}
   563  	return procfs.PidOf(name)
   564  }
   565  
   566  func getPidFromPidFile(pidFile string) (int, error) {
   567  	file, err := os.Open(pidFile)
   568  	if err != nil {
   569  		return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err)
   570  	}
   571  	defer file.Close()
   572  
   573  	data, err := io.ReadAll(file)
   574  	if err != nil {
   575  		return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err)
   576  	}
   577  
   578  	pid, err := strconv.Atoi(string(data))
   579  	if err != nil {
   580  		return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err)
   581  	}
   582  
   583  	return pid, nil
   584  }
   585  
   586  // WaitForPodInitContainerRestartCount waits for the given Pod init container
   587  // to achieve at least a given restartCount
   588  // TODO: eventually look at moving to test/e2e/framework/pod
   589  func WaitForPodInitContainerRestartCount(ctx context.Context, c clientset.Interface, namespace, podName string, initContainerIndex int, desiredRestartCount int32, timeout time.Duration) error {
   590  	conditionDesc := fmt.Sprintf("init container %d started", initContainerIndex)
   591  	return e2epod.WaitForPodCondition(ctx, c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
   592  		if initContainerIndex > len(pod.Status.InitContainerStatuses)-1 {
   593  			return false, nil
   594  		}
   595  		containerStatus := pod.Status.InitContainerStatuses[initContainerIndex]
   596  		return containerStatus.RestartCount >= desiredRestartCount, nil
   597  	})
   598  }
   599  
   600  // WaitForPodContainerRestartCount waits for the given Pod container to achieve at least a given restartCount
   601  // TODO: eventually look at moving to test/e2e/framework/pod
   602  func WaitForPodContainerRestartCount(ctx context.Context, c clientset.Interface, namespace, podName string, containerIndex int, desiredRestartCount int32, timeout time.Duration) error {
   603  	conditionDesc := fmt.Sprintf("container %d started", containerIndex)
   604  	return e2epod.WaitForPodCondition(ctx, c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
   605  		if containerIndex > len(pod.Status.ContainerStatuses)-1 {
   606  			return false, nil
   607  		}
   608  		containerStatus := pod.Status.ContainerStatuses[containerIndex]
   609  		return containerStatus.RestartCount >= desiredRestartCount, nil
   610  	})
   611  }
   612  
   613  // WaitForPodInitContainerToFail waits for the given Pod init container to fail with the given reason, specifically due to
   614  // invalid container configuration. In this case, the container will remain in a waiting state with a specific
   615  // reason set, which should match the given reason.
   616  // TODO: eventually look at moving to test/e2e/framework/pod
   617  func WaitForPodInitContainerToFail(ctx context.Context, c clientset.Interface, namespace, podName string, containerIndex int, reason string, timeout time.Duration) error {
   618  	conditionDesc := fmt.Sprintf("container %d failed with reason %s", containerIndex, reason)
   619  	return e2epod.WaitForPodCondition(ctx, c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
   620  		switch pod.Status.Phase {
   621  		case v1.PodPending:
   622  			if len(pod.Status.InitContainerStatuses) == 0 {
   623  				return false, nil
   624  			}
   625  			containerStatus := pod.Status.InitContainerStatuses[containerIndex]
   626  			if containerStatus.State.Waiting != nil && containerStatus.State.Waiting.Reason == reason {
   627  				return true, nil
   628  			}
   629  			return false, nil
   630  		case v1.PodFailed, v1.PodRunning, v1.PodSucceeded:
   631  			return false, fmt.Errorf("pod was expected to be pending, but it is in the state: %s", pod.Status.Phase)
   632  		}
   633  		return false, nil
   634  	})
   635  }
   636  
   637  func nodeNameOrIP() string {
   638  	return "localhost"
   639  }
   640  

View as plain text