...

Source file src/k8s.io/kubernetes/test/e2e_node/services/kubelet.go

Documentation: k8s.io/kubernetes/test/e2e_node/services

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package services
    18  
    19  import (
    20  	"flag"
    21  	"fmt"
    22  	"os"
    23  	"os/exec"
    24  	"path/filepath"
    25  	"regexp"
    26  	"strconv"
    27  	"strings"
    28  	"time"
    29  
    30  	cliflag "k8s.io/component-base/cli/flag"
    31  	"k8s.io/klog/v2"
    32  	kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
    33  
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/kubernetes/cmd/kubelet/app/options"
    36  	"k8s.io/kubernetes/pkg/cluster/ports"
    37  	kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
    38  	"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
    39  	kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
    40  	utilfs "k8s.io/kubernetes/pkg/util/filesystem"
    41  	"k8s.io/kubernetes/test/e2e/framework"
    42  	"k8s.io/kubernetes/test/e2e_node/builder"
    43  	"k8s.io/kubernetes/test/e2e_node/remote"
    44  )
    45  
    46  // TODO(random-liu): Replace this with standard kubelet launcher.
    47  
    48  // args is the type used to accumulate args from the flags with the same name.
    49  type args []string
    50  
    51  // String function of flag.Value
    52  func (a *args) String() string {
    53  	return fmt.Sprint(*a)
    54  }
    55  
    56  // Set function of flag.Value
    57  func (a *args) Set(value string) error {
    58  	// Note that we assume all white space in flag string is separating fields
    59  	na := strings.Fields(value)
    60  	*a = append(*a, na...)
    61  	return nil
    62  }
    63  
    64  // kubeletArgs is the override kubelet args specified by the test runner.
    65  var kubeletArgs args
    66  var kubeletConfigFile = "./kubeletconfig.yaml"
    67  
    68  func init() {
    69  	flag.Var(&kubeletArgs, "kubelet-flags", "Kubelet flags passed to kubelet, this will override default kubelet flags in the test. Flags specified in multiple kubelet-flags will be concatenate. Deprecated, see: --kubelet-config-file.")
    70  	if flag.Lookup("kubelet-config-file") == nil {
    71  		flag.StringVar(&kubeletConfigFile, "kubelet-config-file", kubeletConfigFile, "The base KubeletConfiguration to use when setting up the kubelet. This configuration will then be minimially modified to support requirements from the test suite.")
    72  	}
    73  }
    74  
    75  // RunKubelet starts kubelet and waits for termination signal. Once receives the
    76  // termination signal, it will stop the kubelet gracefully.
    77  func RunKubelet(featureGates map[string]bool) {
    78  	var err error
    79  	// Enable monitorParent to make sure kubelet will receive termination signal
    80  	// when test process exits.
    81  	e := NewE2EServices(true /* monitorParent */)
    82  	defer e.Stop()
    83  	e.kubelet, err = e.startKubelet(featureGates)
    84  	if err != nil {
    85  		klog.Fatalf("Failed to start kubelet: %v", err)
    86  	}
    87  	// Wait until receiving a termination signal.
    88  	waitForTerminationSignal()
    89  }
    90  
    91  const (
    92  	// KubeletRootDirectory specifies the directory where the kubelet runtime information is stored.
    93  	KubeletRootDirectory = "/var/lib/kubelet"
    94  )
    95  
    96  // Health check url of kubelet
    97  var kubeletHealthCheckURL = fmt.Sprintf("http://127.0.0.1:%d/healthz", ports.KubeletHealthzPort)
    98  
    99  func baseKubeConfiguration(cfgPath string) (*kubeletconfig.KubeletConfiguration, error) {
   100  	cfgPath, err := filepath.Abs(cfgPath)
   101  	if err != nil {
   102  		return nil, err
   103  	}
   104  
   105  	_, err = os.Stat(cfgPath)
   106  	if err != nil {
   107  		// If the kubeletconfig exists, but for some reason we can't read it, then
   108  		// return an error to avoid silently skipping it.
   109  		if !os.IsNotExist(err) {
   110  			return nil, err
   111  		}
   112  
   113  		// If the kubeletconfig file doesn't exist, then use a default configuration
   114  		// as the base.
   115  		kc, err := options.NewKubeletConfiguration()
   116  		if err != nil {
   117  			return nil, err
   118  		}
   119  
   120  		// The following values should match the contents of
   121  		// test/e2e_node/jenkins/default-kubelet-config.yaml. We can't use go embed
   122  		// here to fallback as default config lives in a parallel directory.
   123  		// TODO(endocrimes): Remove fallback for lack of kubelet config when all
   124  		//                   uses of e2e_node switch to providing one (or move to
   125  		//                   kubetest2 and pick up the default).
   126  		kc.CgroupRoot = "/"
   127  		kc.VolumeStatsAggPeriod = metav1.Duration{Duration: 10 * time.Second}
   128  		kc.SerializeImagePulls = false
   129  		kc.FileCheckFrequency = metav1.Duration{Duration: 10 * time.Second}
   130  		kc.PodCIDR = "10.100.0.0/24"
   131  		kc.EvictionPressureTransitionPeriod = metav1.Duration{Duration: 30 * time.Second}
   132  		kc.EvictionHard = map[string]string{
   133  			"memory.available":  "250Mi",
   134  			"nodefs.available":  "10%",
   135  			"nodefs.inodesFree": "5%",
   136  		}
   137  		kc.EvictionMinimumReclaim = map[string]string{
   138  			"nodefs.available":  "5%",
   139  			"nodefs.inodesFree": "5%",
   140  		}
   141  
   142  		return kc, nil
   143  	}
   144  
   145  	loader, err := configfiles.NewFsLoader(&utilfs.DefaultFs{}, cfgPath)
   146  	if err != nil {
   147  		return nil, err
   148  	}
   149  
   150  	return loader.Load()
   151  }
   152  
   153  // startKubelet starts the Kubelet in a separate process or returns an error
   154  // if the Kubelet fails to start.
   155  func (e *E2EServices) startKubelet(featureGates map[string]bool) (*server, error) {
   156  	klog.Info("Starting kubelet")
   157  
   158  	framework.Logf("Standalone mode: %v", framework.TestContext.StandaloneMode)
   159  
   160  	var kubeconfigPath string
   161  
   162  	if !framework.TestContext.StandaloneMode {
   163  		var err error
   164  		// Build kubeconfig
   165  		kubeconfigPath, err = createKubeconfigCWD()
   166  		if err != nil {
   167  			return nil, err
   168  		}
   169  	}
   170  
   171  	// KubeletConfiguration file path
   172  	kubeletConfigPath, err := kubeletConfigCWDPath()
   173  	if err != nil {
   174  		return nil, err
   175  	}
   176  
   177  	// KubeletDropInConfiguration directory path
   178  	framework.TestContext.KubeletConfigDropinDir, err = KubeletConfigDirCWDDir()
   179  	if err != nil {
   180  		return nil, err
   181  	}
   182  
   183  	// Create pod directory
   184  	podPath, err := createPodDirectory()
   185  	if err != nil {
   186  		return nil, err
   187  	}
   188  	e.rmDirs = append(e.rmDirs, podPath)
   189  	err = createRootDirectory(KubeletRootDirectory)
   190  	if err != nil {
   191  		return nil, err
   192  	}
   193  
   194  	lookup := flag.Lookup("kubelet-config-file")
   195  	if lookup != nil {
   196  		kubeletConfigFile = lookup.Value.String()
   197  	}
   198  	kc, err := baseKubeConfiguration(kubeletConfigFile)
   199  	if err != nil {
   200  		return nil, fmt.Errorf("failed to load base kubelet configuration: %w", err)
   201  	}
   202  
   203  	// Apply overrides to allow access to the Kubelet API from the test suite.
   204  	// These are insecure and should generally not be used outside of test infra.
   205  
   206  	// --anonymous-auth
   207  	kc.Authentication.Anonymous.Enabled = true
   208  	// --authentication-token-webhook
   209  	kc.Authentication.Webhook.Enabled = false
   210  	// --authorization-mode
   211  	kc.Authorization.Mode = kubeletconfig.KubeletAuthorizationModeAlwaysAllow
   212  	// --read-only-port
   213  	kc.ReadOnlyPort = ports.KubeletReadOnlyPort
   214  
   215  	// Static Pods are in a per-test location, so we override them for tests.
   216  	kc.StaticPodPath = podPath
   217  
   218  	var killCommand, restartCommand *exec.Cmd
   219  	var isSystemd bool
   220  	var unitName string
   221  	// Apply default kubelet flags.
   222  	cmdArgs := []string{}
   223  	if systemdRun, err := exec.LookPath("systemd-run"); err == nil {
   224  		// On systemd services, detection of a service / unit works reliably while
   225  		// detection of a process started from an ssh session does not work.
   226  		// Since kubelet will typically be run as a service it also makes more
   227  		// sense to test it that way
   228  		isSystemd = true
   229  
   230  		// If we are running on systemd >=240, we can append to the
   231  		// same log file on restarts
   232  		logLocation := "StandardError=file:"
   233  		if version, verr := exec.Command("systemd-run", "--version").Output(); verr == nil {
   234  			// sample output from $ systemd-run --version
   235  			// systemd 245 (245.4-4ubuntu3.13)
   236  			re := regexp.MustCompile(`systemd (\d+)`)
   237  			if match := re.FindSubmatch(version); len(match) > 1 {
   238  				num, _ := strconv.Atoi(string(match[1]))
   239  				if num >= 240 {
   240  					logLocation = "StandardError=append:"
   241  				}
   242  			}
   243  		}
   244  		// We can ignore errors, to have GetTimestampFromWorkspaceDir() fallback
   245  		// to the current time.
   246  		cwd, _ := os.Getwd()
   247  		// Use the timestamp from the current directory to name the systemd unit.
   248  		unitTimestamp := remote.GetTimestampFromWorkspaceDir(cwd)
   249  		unitName = fmt.Sprintf("kubelet-%s.service", unitTimestamp)
   250  		cmdArgs = append(cmdArgs,
   251  			systemdRun,
   252  			"-p", "Delegate=true",
   253  			"-p", logLocation+framework.TestContext.ReportDir+"/kubelet.log",
   254  			"--unit="+unitName,
   255  			"--slice=runtime.slice",
   256  			"--remain-after-exit",
   257  			builder.GetKubeletServerBin())
   258  
   259  		killCommand = exec.Command("systemctl", "kill", unitName)
   260  		restartCommand = exec.Command("systemctl", "restart", unitName)
   261  
   262  		kc.KubeletCgroups = "/kubelet.slice"
   263  	} else {
   264  		cmdArgs = append(cmdArgs, builder.GetKubeletServerBin())
   265  		// TODO(random-liu): Get rid of this docker specific thing.
   266  		cmdArgs = append(cmdArgs, "--runtime-cgroups=/docker-daemon")
   267  
   268  		kc.KubeletCgroups = "/kubelet"
   269  
   270  		kc.SystemCgroups = "/system"
   271  	}
   272  
   273  	if !framework.TestContext.StandaloneMode {
   274  		cmdArgs = append(cmdArgs,
   275  			"--kubeconfig", kubeconfigPath,
   276  		)
   277  	}
   278  
   279  	cmdArgs = append(cmdArgs,
   280  		"--root-dir", KubeletRootDirectory,
   281  		"--v", LogVerbosityLevel,
   282  	)
   283  
   284  	// Apply test framework feature gates by default. This could also be overridden
   285  	// by kubelet-flags.
   286  	if len(featureGates) > 0 {
   287  		cmdArgs = append(cmdArgs, "--feature-gates", cliflag.NewMapStringBool(&featureGates).String())
   288  		kc.FeatureGates = featureGates
   289  	}
   290  
   291  	// Add the KubeletDropinConfigDirectory flag if set.
   292  	cmdArgs = append(cmdArgs, "--config-dir", framework.TestContext.KubeletConfigDropinDir)
   293  
   294  	// Keep hostname override for convenience.
   295  	if framework.TestContext.NodeName != "" { // If node name is specified, set hostname override.
   296  		cmdArgs = append(cmdArgs, "--hostname-override", framework.TestContext.NodeName)
   297  	}
   298  
   299  	if framework.TestContext.ContainerRuntimeEndpoint != "" {
   300  		cmdArgs = append(cmdArgs, "--container-runtime-endpoint", framework.TestContext.ContainerRuntimeEndpoint)
   301  	}
   302  
   303  	if framework.TestContext.ImageServiceEndpoint != "" {
   304  		cmdArgs = append(cmdArgs, "--image-service-endpoint", framework.TestContext.ImageServiceEndpoint)
   305  	}
   306  
   307  	if err := WriteKubeletConfigFile(kc, kubeletConfigPath); err != nil {
   308  		return nil, err
   309  	}
   310  	// add the flag to load config from a file
   311  	cmdArgs = append(cmdArgs, "--config", kubeletConfigPath)
   312  
   313  	// Override the default kubelet flags.
   314  	cmdArgs = append(cmdArgs, kubeletArgs...)
   315  
   316  	// Adjust the args if we are running kubelet with systemd.
   317  	if isSystemd {
   318  		adjustArgsForSystemd(cmdArgs)
   319  	}
   320  
   321  	cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
   322  	restartOnExit := framework.TestContext.RestartKubelet
   323  	server := newServer(
   324  		"kubelet",
   325  		cmd,
   326  		killCommand,
   327  		restartCommand,
   328  		[]string{kubeletHealthCheckURL},
   329  		"kubelet.log",
   330  		e.monitorParent,
   331  		restartOnExit,
   332  		unitName)
   333  	return server, server.start()
   334  }
   335  
   336  // WriteKubeletConfigFile writes the kubelet config file based on the args and returns the filename
   337  func WriteKubeletConfigFile(internal *kubeletconfig.KubeletConfiguration, path string) error {
   338  	data, err := kubeletconfigcodec.EncodeKubeletConfig(internal, kubeletconfigv1beta1.SchemeGroupVersion)
   339  	if err != nil {
   340  		return err
   341  	}
   342  	// create the directory, if it does not exist
   343  	dir := filepath.Dir(path)
   344  	if err := os.MkdirAll(dir, 0755); err != nil {
   345  		return err
   346  	}
   347  	// write the file
   348  	if err := os.WriteFile(path, data, 0755); err != nil {
   349  		return err
   350  	}
   351  	return nil
   352  }
   353  
   354  // createPodDirectory creates pod directory.
   355  func createPodDirectory() (string, error) {
   356  	cwd, err := os.Getwd()
   357  	if err != nil {
   358  		return "", fmt.Errorf("failed to get current working directory: %w", err)
   359  	}
   360  	path, err := os.MkdirTemp(cwd, "static-pods")
   361  	if err != nil {
   362  		return "", fmt.Errorf("failed to create static pod directory: %w", err)
   363  	}
   364  	return path, nil
   365  }
   366  
   367  // createKubeconfig creates a kubeconfig file at the fully qualified `path`. The parent dirs must exist.
   368  func createKubeconfig(path string) error {
   369  	kubeconfig := []byte(fmt.Sprintf(`apiVersion: v1
   370  kind: Config
   371  users:
   372  - name: kubelet
   373    user:
   374      token: %s
   375  clusters:
   376  - cluster:
   377      server: %s
   378      insecure-skip-tls-verify: true
   379    name: local
   380  contexts:
   381  - context:
   382      cluster: local
   383      user: kubelet
   384    name: local-context
   385  current-context: local-context`, framework.TestContext.BearerToken, getAPIServerClientURL()))
   386  
   387  	if err := os.WriteFile(path, kubeconfig, 0666); err != nil {
   388  		return err
   389  	}
   390  	return nil
   391  }
   392  
   393  func createRootDirectory(path string) error {
   394  	if _, err := os.Stat(path); err != nil {
   395  		if os.IsNotExist(err) {
   396  			return os.MkdirAll(path, os.FileMode(0755))
   397  		}
   398  		return err
   399  	}
   400  	return nil
   401  }
   402  
   403  func kubeconfigCWDPath() (string, error) {
   404  	cwd, err := os.Getwd()
   405  	if err != nil {
   406  		return "", fmt.Errorf("failed to get current working directory: %w", err)
   407  	}
   408  	return filepath.Join(cwd, "kubeconfig"), nil
   409  }
   410  
   411  func kubeletConfigCWDPath() (string, error) {
   412  	cwd, err := os.Getwd()
   413  	if err != nil {
   414  		return "", fmt.Errorf("failed to get current working directory: %w", err)
   415  	}
   416  	// DO NOT name this file "kubelet" - you will overwrite the kubelet binary and be very confused :)
   417  	return filepath.Join(cwd, "kubelet-config"), nil
   418  }
   419  
   420  func KubeletConfigDirCWDDir() (string, error) {
   421  	cwd, err := os.Getwd()
   422  	if err != nil {
   423  		return "", fmt.Errorf("failed to get current working directory: %w", err)
   424  	}
   425  	dir := filepath.Join(cwd, "kubelet.conf.d")
   426  	if err := os.MkdirAll(dir, 0755); err != nil {
   427  		return "", err
   428  	}
   429  	return dir, nil
   430  }
   431  
   432  // like createKubeconfig, but creates kubeconfig at current-working-directory/kubeconfig
   433  // returns a fully-qualified path to the kubeconfig file
   434  func createKubeconfigCWD() (string, error) {
   435  	kubeconfigPath, err := kubeconfigCWDPath()
   436  	if err != nil {
   437  		return "", err
   438  	}
   439  
   440  	if err = createKubeconfig(kubeconfigPath); err != nil {
   441  		return "", err
   442  	}
   443  	return kubeconfigPath, nil
   444  }
   445  
   446  // adjustArgsForSystemd escape special characters in kubelet arguments for systemd. Systemd
   447  // may try to do auto expansion without escaping.
   448  func adjustArgsForSystemd(args []string) {
   449  	for i := range args {
   450  		args[i] = strings.Replace(args[i], "%", "%%", -1)
   451  		args[i] = strings.Replace(args[i], "$", "$$", -1)
   452  	}
   453  }
   454  

View as plain text