1
16
17 package services
18
19 import (
20 "flag"
21 "fmt"
22 "os"
23 "os/exec"
24 "path/filepath"
25 "regexp"
26 "strconv"
27 "strings"
28 "time"
29
30 cliflag "k8s.io/component-base/cli/flag"
31 "k8s.io/klog/v2"
32 kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
33
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/kubernetes/cmd/kubelet/app/options"
36 "k8s.io/kubernetes/pkg/cluster/ports"
37 kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
38 "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
39 kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
40 utilfs "k8s.io/kubernetes/pkg/util/filesystem"
41 "k8s.io/kubernetes/test/e2e/framework"
42 "k8s.io/kubernetes/test/e2e_node/builder"
43 "k8s.io/kubernetes/test/e2e_node/remote"
44 )
45
46
47
48
49 type args []string
50
51
52 func (a *args) String() string {
53 return fmt.Sprint(*a)
54 }
55
56
57 func (a *args) Set(value string) error {
58
59 na := strings.Fields(value)
60 *a = append(*a, na...)
61 return nil
62 }
63
64
65 var kubeletArgs args
66 var kubeletConfigFile = "./kubeletconfig.yaml"
67
68 func init() {
69 flag.Var(&kubeletArgs, "kubelet-flags", "Kubelet flags passed to kubelet, this will override default kubelet flags in the test. Flags specified in multiple kubelet-flags will be concatenate. Deprecated, see: --kubelet-config-file.")
70 if flag.Lookup("kubelet-config-file") == nil {
71 flag.StringVar(&kubeletConfigFile, "kubelet-config-file", kubeletConfigFile, "The base KubeletConfiguration to use when setting up the kubelet. This configuration will then be minimially modified to support requirements from the test suite.")
72 }
73 }
74
75
76
77 func RunKubelet(featureGates map[string]bool) {
78 var err error
79
80
81 e := NewE2EServices(true )
82 defer e.Stop()
83 e.kubelet, err = e.startKubelet(featureGates)
84 if err != nil {
85 klog.Fatalf("Failed to start kubelet: %v", err)
86 }
87
88 waitForTerminationSignal()
89 }
90
91 const (
92
93 KubeletRootDirectory = "/var/lib/kubelet"
94 )
95
96
97 var kubeletHealthCheckURL = fmt.Sprintf("http://127.0.0.1:%d/healthz", ports.KubeletHealthzPort)
98
99 func baseKubeConfiguration(cfgPath string) (*kubeletconfig.KubeletConfiguration, error) {
100 cfgPath, err := filepath.Abs(cfgPath)
101 if err != nil {
102 return nil, err
103 }
104
105 _, err = os.Stat(cfgPath)
106 if err != nil {
107
108
109 if !os.IsNotExist(err) {
110 return nil, err
111 }
112
113
114
115 kc, err := options.NewKubeletConfiguration()
116 if err != nil {
117 return nil, err
118 }
119
120
121
122
123
124
125
126 kc.CgroupRoot = "/"
127 kc.VolumeStatsAggPeriod = metav1.Duration{Duration: 10 * time.Second}
128 kc.SerializeImagePulls = false
129 kc.FileCheckFrequency = metav1.Duration{Duration: 10 * time.Second}
130 kc.PodCIDR = "10.100.0.0/24"
131 kc.EvictionPressureTransitionPeriod = metav1.Duration{Duration: 30 * time.Second}
132 kc.EvictionHard = map[string]string{
133 "memory.available": "250Mi",
134 "nodefs.available": "10%",
135 "nodefs.inodesFree": "5%",
136 }
137 kc.EvictionMinimumReclaim = map[string]string{
138 "nodefs.available": "5%",
139 "nodefs.inodesFree": "5%",
140 }
141
142 return kc, nil
143 }
144
145 loader, err := configfiles.NewFsLoader(&utilfs.DefaultFs{}, cfgPath)
146 if err != nil {
147 return nil, err
148 }
149
150 return loader.Load()
151 }
152
153
154
155 func (e *E2EServices) startKubelet(featureGates map[string]bool) (*server, error) {
156 klog.Info("Starting kubelet")
157
158 framework.Logf("Standalone mode: %v", framework.TestContext.StandaloneMode)
159
160 var kubeconfigPath string
161
162 if !framework.TestContext.StandaloneMode {
163 var err error
164
165 kubeconfigPath, err = createKubeconfigCWD()
166 if err != nil {
167 return nil, err
168 }
169 }
170
171
172 kubeletConfigPath, err := kubeletConfigCWDPath()
173 if err != nil {
174 return nil, err
175 }
176
177
178 framework.TestContext.KubeletConfigDropinDir, err = KubeletConfigDirCWDDir()
179 if err != nil {
180 return nil, err
181 }
182
183
184 podPath, err := createPodDirectory()
185 if err != nil {
186 return nil, err
187 }
188 e.rmDirs = append(e.rmDirs, podPath)
189 err = createRootDirectory(KubeletRootDirectory)
190 if err != nil {
191 return nil, err
192 }
193
194 lookup := flag.Lookup("kubelet-config-file")
195 if lookup != nil {
196 kubeletConfigFile = lookup.Value.String()
197 }
198 kc, err := baseKubeConfiguration(kubeletConfigFile)
199 if err != nil {
200 return nil, fmt.Errorf("failed to load base kubelet configuration: %w", err)
201 }
202
203
204
205
206
207 kc.Authentication.Anonymous.Enabled = true
208
209 kc.Authentication.Webhook.Enabled = false
210
211 kc.Authorization.Mode = kubeletconfig.KubeletAuthorizationModeAlwaysAllow
212
213 kc.ReadOnlyPort = ports.KubeletReadOnlyPort
214
215
216 kc.StaticPodPath = podPath
217
218 var killCommand, restartCommand *exec.Cmd
219 var isSystemd bool
220 var unitName string
221
222 cmdArgs := []string{}
223 if systemdRun, err := exec.LookPath("systemd-run"); err == nil {
224
225
226
227
228 isSystemd = true
229
230
231
232 logLocation := "StandardError=file:"
233 if version, verr := exec.Command("systemd-run", "--version").Output(); verr == nil {
234
235
236 re := regexp.MustCompile(`systemd (\d+)`)
237 if match := re.FindSubmatch(version); len(match) > 1 {
238 num, _ := strconv.Atoi(string(match[1]))
239 if num >= 240 {
240 logLocation = "StandardError=append:"
241 }
242 }
243 }
244
245
246 cwd, _ := os.Getwd()
247
248 unitTimestamp := remote.GetTimestampFromWorkspaceDir(cwd)
249 unitName = fmt.Sprintf("kubelet-%s.service", unitTimestamp)
250 cmdArgs = append(cmdArgs,
251 systemdRun,
252 "-p", "Delegate=true",
253 "-p", logLocation+framework.TestContext.ReportDir+"/kubelet.log",
254 "--unit="+unitName,
255 "--slice=runtime.slice",
256 "--remain-after-exit",
257 builder.GetKubeletServerBin())
258
259 killCommand = exec.Command("systemctl", "kill", unitName)
260 restartCommand = exec.Command("systemctl", "restart", unitName)
261
262 kc.KubeletCgroups = "/kubelet.slice"
263 } else {
264 cmdArgs = append(cmdArgs, builder.GetKubeletServerBin())
265
266 cmdArgs = append(cmdArgs, "--runtime-cgroups=/docker-daemon")
267
268 kc.KubeletCgroups = "/kubelet"
269
270 kc.SystemCgroups = "/system"
271 }
272
273 if !framework.TestContext.StandaloneMode {
274 cmdArgs = append(cmdArgs,
275 "--kubeconfig", kubeconfigPath,
276 )
277 }
278
279 cmdArgs = append(cmdArgs,
280 "--root-dir", KubeletRootDirectory,
281 "--v", LogVerbosityLevel,
282 )
283
284
285
286 if len(featureGates) > 0 {
287 cmdArgs = append(cmdArgs, "--feature-gates", cliflag.NewMapStringBool(&featureGates).String())
288 kc.FeatureGates = featureGates
289 }
290
291
292 cmdArgs = append(cmdArgs, "--config-dir", framework.TestContext.KubeletConfigDropinDir)
293
294
295 if framework.TestContext.NodeName != "" {
296 cmdArgs = append(cmdArgs, "--hostname-override", framework.TestContext.NodeName)
297 }
298
299 if framework.TestContext.ContainerRuntimeEndpoint != "" {
300 cmdArgs = append(cmdArgs, "--container-runtime-endpoint", framework.TestContext.ContainerRuntimeEndpoint)
301 }
302
303 if framework.TestContext.ImageServiceEndpoint != "" {
304 cmdArgs = append(cmdArgs, "--image-service-endpoint", framework.TestContext.ImageServiceEndpoint)
305 }
306
307 if err := WriteKubeletConfigFile(kc, kubeletConfigPath); err != nil {
308 return nil, err
309 }
310
311 cmdArgs = append(cmdArgs, "--config", kubeletConfigPath)
312
313
314 cmdArgs = append(cmdArgs, kubeletArgs...)
315
316
317 if isSystemd {
318 adjustArgsForSystemd(cmdArgs)
319 }
320
321 cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
322 restartOnExit := framework.TestContext.RestartKubelet
323 server := newServer(
324 "kubelet",
325 cmd,
326 killCommand,
327 restartCommand,
328 []string{kubeletHealthCheckURL},
329 "kubelet.log",
330 e.monitorParent,
331 restartOnExit,
332 unitName)
333 return server, server.start()
334 }
335
336
337 func WriteKubeletConfigFile(internal *kubeletconfig.KubeletConfiguration, path string) error {
338 data, err := kubeletconfigcodec.EncodeKubeletConfig(internal, kubeletconfigv1beta1.SchemeGroupVersion)
339 if err != nil {
340 return err
341 }
342
343 dir := filepath.Dir(path)
344 if err := os.MkdirAll(dir, 0755); err != nil {
345 return err
346 }
347
348 if err := os.WriteFile(path, data, 0755); err != nil {
349 return err
350 }
351 return nil
352 }
353
354
355 func createPodDirectory() (string, error) {
356 cwd, err := os.Getwd()
357 if err != nil {
358 return "", fmt.Errorf("failed to get current working directory: %w", err)
359 }
360 path, err := os.MkdirTemp(cwd, "static-pods")
361 if err != nil {
362 return "", fmt.Errorf("failed to create static pod directory: %w", err)
363 }
364 return path, nil
365 }
366
367
368 func createKubeconfig(path string) error {
369 kubeconfig := []byte(fmt.Sprintf(`apiVersion: v1
370 kind: Config
371 users:
372 - name: kubelet
373 user:
374 token: %s
375 clusters:
376 - cluster:
377 server: %s
378 insecure-skip-tls-verify: true
379 name: local
380 contexts:
381 - context:
382 cluster: local
383 user: kubelet
384 name: local-context
385 current-context: local-context`, framework.TestContext.BearerToken, getAPIServerClientURL()))
386
387 if err := os.WriteFile(path, kubeconfig, 0666); err != nil {
388 return err
389 }
390 return nil
391 }
392
393 func createRootDirectory(path string) error {
394 if _, err := os.Stat(path); err != nil {
395 if os.IsNotExist(err) {
396 return os.MkdirAll(path, os.FileMode(0755))
397 }
398 return err
399 }
400 return nil
401 }
402
403 func kubeconfigCWDPath() (string, error) {
404 cwd, err := os.Getwd()
405 if err != nil {
406 return "", fmt.Errorf("failed to get current working directory: %w", err)
407 }
408 return filepath.Join(cwd, "kubeconfig"), nil
409 }
410
411 func kubeletConfigCWDPath() (string, error) {
412 cwd, err := os.Getwd()
413 if err != nil {
414 return "", fmt.Errorf("failed to get current working directory: %w", err)
415 }
416
417 return filepath.Join(cwd, "kubelet-config"), nil
418 }
419
420 func KubeletConfigDirCWDDir() (string, error) {
421 cwd, err := os.Getwd()
422 if err != nil {
423 return "", fmt.Errorf("failed to get current working directory: %w", err)
424 }
425 dir := filepath.Join(cwd, "kubelet.conf.d")
426 if err := os.MkdirAll(dir, 0755); err != nil {
427 return "", err
428 }
429 return dir, nil
430 }
431
432
433
434 func createKubeconfigCWD() (string, error) {
435 kubeconfigPath, err := kubeconfigCWDPath()
436 if err != nil {
437 return "", err
438 }
439
440 if err = createKubeconfig(kubeconfigPath); err != nil {
441 return "", err
442 }
443 return kubeconfigPath, nil
444 }
445
446
447
448 func adjustArgsForSystemd(args []string) {
449 for i := range args {
450 args[i] = strings.Replace(args[i], "%", "%%", -1)
451 args[i] = strings.Replace(args[i], "$", "$$", -1)
452 }
453 }
454
View as plain text