1
16
17
18 package app
19
20 import (
21 "context"
22 "crypto/tls"
23 "encoding/json"
24 "errors"
25 "fmt"
26 "io"
27 "io/fs"
28 "math"
29 "net"
30 "net/http"
31 "os"
32 "path/filepath"
33 "strconv"
34 "strings"
35 "time"
36
37 "github.com/coreos/go-systemd/v22/daemon"
38 jsonpatch "github.com/evanphx/json-patch"
39 "github.com/spf13/cobra"
40 "github.com/spf13/pflag"
41 "google.golang.org/grpc/codes"
42 "google.golang.org/grpc/status"
43 "k8s.io/klog/v2"
44 "k8s.io/mount-utils"
45
46 cadvisorapi "github.com/google/cadvisor/info/v1"
47 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
48 otelsdkresource "go.opentelemetry.io/otel/sdk/resource"
49 semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
50 oteltrace "go.opentelemetry.io/otel/trace"
51 v1 "k8s.io/api/core/v1"
52 "k8s.io/apimachinery/pkg/api/resource"
53 "k8s.io/apimachinery/pkg/runtime"
54 "k8s.io/apimachinery/pkg/types"
55 utilnet "k8s.io/apimachinery/pkg/util/net"
56 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
57 "k8s.io/apimachinery/pkg/util/sets"
58 "k8s.io/apimachinery/pkg/util/validation/field"
59 "k8s.io/apimachinery/pkg/util/wait"
60 genericapiserver "k8s.io/apiserver/pkg/server"
61 "k8s.io/apiserver/pkg/server/healthz"
62 utilfeature "k8s.io/apiserver/pkg/util/feature"
63 clientset "k8s.io/client-go/kubernetes"
64 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
65 restclient "k8s.io/client-go/rest"
66 "k8s.io/client-go/tools/clientcmd"
67 "k8s.io/client-go/tools/record"
68 certutil "k8s.io/client-go/util/cert"
69 "k8s.io/client-go/util/certificate"
70 "k8s.io/client-go/util/connrotation"
71 "k8s.io/client-go/util/keyutil"
72 cloudprovider "k8s.io/cloud-provider"
73 cliflag "k8s.io/component-base/cli/flag"
74 "k8s.io/component-base/configz"
75 "k8s.io/component-base/featuregate"
76 "k8s.io/component-base/logs"
77 logsapi "k8s.io/component-base/logs/api/v1"
78 "k8s.io/component-base/metrics"
79 "k8s.io/component-base/metrics/legacyregistry"
80 "k8s.io/component-base/tracing"
81 "k8s.io/component-base/version"
82 "k8s.io/component-base/version/verflag"
83 nodeutil "k8s.io/component-helpers/node/util"
84 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
85 kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
86 "k8s.io/kubernetes/cmd/kubelet/app/options"
87 "k8s.io/kubernetes/pkg/api/legacyscheme"
88 "k8s.io/kubernetes/pkg/capabilities"
89 "k8s.io/kubernetes/pkg/credentialprovider"
90 "k8s.io/kubernetes/pkg/features"
91 "k8s.io/kubernetes/pkg/kubelet"
92 kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
93 kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
94 kubeletconfigvalidation "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
95 "k8s.io/kubernetes/pkg/kubelet/cadvisor"
96 kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
97 "k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap"
98 "k8s.io/kubernetes/pkg/kubelet/cm"
99 "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
100 "k8s.io/kubernetes/pkg/kubelet/config"
101 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
102 "k8s.io/kubernetes/pkg/kubelet/eviction"
103 evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
104 "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
105 kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
106 "k8s.io/kubernetes/pkg/kubelet/server"
107 "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
108 kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
109 kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
110 utilfs "k8s.io/kubernetes/pkg/util/filesystem"
111 "k8s.io/kubernetes/pkg/util/flock"
112 "k8s.io/kubernetes/pkg/util/oom"
113 "k8s.io/kubernetes/pkg/util/rlimit"
114 "k8s.io/kubernetes/pkg/volume/util/hostutil"
115 "k8s.io/kubernetes/pkg/volume/util/subpath"
116 "k8s.io/utils/cpuset"
117 "k8s.io/utils/exec"
118 netutils "k8s.io/utils/net"
119 )
120
121 func init() {
122 utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
123 }
124
125 const (
126
127 componentKubelet = "kubelet"
128 )
129
130
131 func NewKubeletCommand() *cobra.Command {
132 cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
133 cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
134 kubeletFlags := options.NewKubeletFlags()
135
136 kubeletConfig, err := options.NewKubeletConfiguration()
137
138 if err != nil {
139 klog.ErrorS(err, "Failed to create a new kubelet configuration")
140 os.Exit(1)
141 }
142
143 cmd := &cobra.Command{
144 Use: componentKubelet,
145 Long: `The kubelet is the primary "node agent" that runs on each
146 node. It can register the node with the apiserver using one of: the hostname; a flag to
147 override the hostname; or specific logic for a cloud provider.
148
149 The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
150 that describes a pod. The kubelet takes a set of PodSpecs that are provided through
151 various mechanisms (primarily through the apiserver) and ensures that the containers
152 described in those PodSpecs are running and healthy. The kubelet doesn't manage
153 containers which were not created by Kubernetes.
154
155 Other than from an PodSpec from the apiserver, there are two ways that a container
156 manifest can be provided to the Kubelet.
157
158 File: Path passed as a flag on the command line. Files under this path will be monitored
159 periodically for updates. The monitoring period is 20s by default and is configurable
160 via a flag.
161
162 HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
163 is checked every 20 seconds (also configurable with a flag).`,
164
165
166
167
168 DisableFlagParsing: true,
169 SilenceUsage: true,
170 RunE: func(cmd *cobra.Command, args []string) error {
171
172 if err := cleanFlagSet.Parse(args); err != nil {
173 return fmt.Errorf("failed to parse kubelet flag: %w", err)
174 }
175
176
177 cmds := cleanFlagSet.Args()
178 if len(cmds) > 0 {
179 return fmt.Errorf("unknown command %+s", cmds[0])
180 }
181
182
183 help, err := cleanFlagSet.GetBool("help")
184 if err != nil {
185 return errors.New(`"help" flag is non-bool, programmer error, please correct`)
186 }
187 if help {
188 return cmd.Help()
189 }
190
191
192 verflag.PrintAndExitIfRequested()
193
194
195 if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
196 return fmt.Errorf("failed to set feature gates from initial flags-based config: %w", err)
197 }
198
199
200 if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
201 return fmt.Errorf("failed to validate kubelet flags: %w", err)
202 }
203
204 if cleanFlagSet.Changed("pod-infra-container-image") {
205 klog.InfoS("--pod-infra-container-image will not be pruned by the image garbage collector in kubelet and should also be set in the remote runtime")
206 _ = cmd.Flags().MarkDeprecated("pod-infra-container-image", "--pod-infra-container-image will be removed in 1.30. Image garbage collector will get sandbox image information from CRI.")
207 }
208
209
210 if len(kubeletFlags.KubeletConfigFile) > 0 {
211 kubeletConfig, err = loadConfigFile(kubeletFlags.KubeletConfigFile)
212 if err != nil {
213 return fmt.Errorf("failed to load kubelet config file, path: %s, error: %w", kubeletFlags.KubeletConfigFile, err)
214 }
215 }
216
217 if len(kubeletFlags.KubeletDropinConfigDirectory) > 0 {
218 if err := mergeKubeletConfigurations(kubeletConfig, kubeletFlags.KubeletDropinConfigDirectory); err != nil {
219 return fmt.Errorf("failed to merge kubelet configs: %w", err)
220 }
221 }
222
223 if len(kubeletFlags.KubeletConfigFile) > 0 || len(kubeletFlags.KubeletDropinConfigDirectory) > 0 {
224
225
226
227 if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
228 return fmt.Errorf("failed to precedence kubeletConfigFlag: %w", err)
229 }
230
231 if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
232 return fmt.Errorf("failed to set feature gates from initial flags-based config: %w", err)
233 }
234 }
235
236
237 logs.InitLogs()
238 if err := logsapi.ValidateAndApplyAsField(&kubeletConfig.Logging, utilfeature.DefaultFeatureGate, field.NewPath("logging")); err != nil {
239 return fmt.Errorf("initialize logging: %v", err)
240 }
241 cliflag.PrintFlags(cleanFlagSet)
242
243
244
245 if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig, utilfeature.DefaultFeatureGate); err != nil {
246 return fmt.Errorf("failed to validate kubelet configuration, error: %w, path: %s", err, kubeletConfig)
247 }
248
249 if (kubeletConfig.KubeletCgroups != "" && kubeletConfig.KubeReservedCgroup != "") && (strings.Index(kubeletConfig.KubeletCgroups, kubeletConfig.KubeReservedCgroup) != 0) {
250 klog.InfoS("unsupported configuration:KubeletCgroups is not within KubeReservedCgroup")
251 }
252
253
254 kubeletServer := &options.KubeletServer{
255 KubeletFlags: *kubeletFlags,
256 KubeletConfiguration: *kubeletConfig,
257 }
258
259
260 kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
261 if err != nil {
262 return fmt.Errorf("failed to construct kubelet dependencies: %w", err)
263 }
264
265 if err := checkPermissions(); err != nil {
266 klog.ErrorS(err, "kubelet running with insufficient permissions")
267 }
268
269
270 config := kubeletServer.KubeletConfiguration.DeepCopy()
271 for k := range config.StaticPodURLHeader {
272 config.StaticPodURLHeader[k] = []string{"<masked>"}
273 }
274
275 klog.V(5).InfoS("KubeletConfiguration", "configuration", klog.Format(config))
276
277
278 ctx := genericapiserver.SetupSignalContext()
279
280 utilfeature.DefaultMutableFeatureGate.AddMetrics()
281
282 return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
283 },
284 }
285
286
287 kubeletFlags.AddFlags(cleanFlagSet)
288 options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
289 options.AddGlobalFlags(cleanFlagSet)
290 cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))
291
292
293 const usageFmt = "Usage:\n %s\n\nFlags:\n%s"
294 cmd.SetUsageFunc(func(cmd *cobra.Command) error {
295 fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
296 return nil
297 })
298 cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
299 fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
300 })
301
302 return cmd
303 }
304
305
306
307
308
309
310
311 func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConfiguration, kubeletDropInConfigDir string) error {
312 const dropinFileExtension = ".conf"
313 baseKubeletConfigJSON, err := json.Marshal(kubeletConfig)
314 if err != nil {
315 return fmt.Errorf("failed to marshal base config: %w", err)
316 }
317
318 if err := filepath.WalkDir(kubeletDropInConfigDir, func(path string, info fs.DirEntry, err error) error {
319 if err != nil {
320 return err
321 }
322 if !info.IsDir() && filepath.Ext(info.Name()) == dropinFileExtension {
323 dropinConfigJSON, err := loadDropinConfigFileIntoJSON(path)
324 if err != nil {
325 return fmt.Errorf("failed to load kubelet dropin file, path: %s, error: %w", path, err)
326 }
327 mergedConfigJSON, err := jsonpatch.MergePatch(baseKubeletConfigJSON, dropinConfigJSON)
328 if err != nil {
329 return fmt.Errorf("failed to merge drop-in and current config: %w", err)
330 }
331 baseKubeletConfigJSON = mergedConfigJSON
332 }
333 return nil
334 }); err != nil {
335 return fmt.Errorf("failed to walk through kubelet dropin directory %q: %w", kubeletDropInConfigDir, err)
336 }
337
338 if err := json.Unmarshal(baseKubeletConfigJSON, kubeletConfig); err != nil {
339 return fmt.Errorf("failed to unmarshal merged JSON into kubelet configuration: %w", err)
340 }
341 return nil
342 }
343
344
345
346 func newFlagSetWithGlobals() *pflag.FlagSet {
347 fs := pflag.NewFlagSet("", pflag.ExitOnError)
348
349 fs.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
350
351 options.AddGlobalFlags(fs)
352 return fs
353 }
354
355
356
357 func newFakeFlagSet(fs *pflag.FlagSet) *pflag.FlagSet {
358 ret := pflag.NewFlagSet("", pflag.ExitOnError)
359 ret.SetNormalizeFunc(fs.GetNormalizeFunc())
360 fs.VisitAll(func(f *pflag.Flag) {
361 ret.VarP(cliflag.NoOp{}, f.Name, f.Shorthand, f.Usage)
362 })
363 return ret
364 }
365
366
367
368
369
370 func kubeletConfigFlagPrecedence(kc *kubeletconfiginternal.KubeletConfiguration, args []string) error {
371
372
373 fs := newFakeFlagSet(newFlagSetWithGlobals())
374
375 options.NewKubeletFlags().AddFlags(fs)
376
377 options.AddKubeletConfigFlags(fs, kc)
378
379 original := kc.FeatureGates
380
381 fs.SetOutput(io.Discard)
382
383 if err := fs.Parse(args); err != nil {
384 return err
385 }
386
387 for k, v := range original {
388 if _, ok := kc.FeatureGates[k]; !ok {
389 kc.FeatureGates[k] = v
390 }
391 }
392 return nil
393 }
394
395 func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, error) {
396 const errFmt = "failed to load Kubelet config file %s, error %v"
397
398 kubeletConfigFile, err := filepath.Abs(name)
399 if err != nil {
400 return nil, fmt.Errorf(errFmt, name, err)
401 }
402 loader, err := configfiles.NewFsLoader(&utilfs.DefaultFs{}, kubeletConfigFile)
403 if err != nil {
404 return nil, fmt.Errorf(errFmt, name, err)
405 }
406 kc, err := loader.Load()
407 if err != nil {
408 return nil, fmt.Errorf(errFmt, name, err)
409 }
410
411
412
413
414 if kc.EvictionHard == nil {
415 kc.EvictionHard = eviction.DefaultEvictionHard
416 }
417 return kc, err
418 }
419
420 func loadDropinConfigFileIntoJSON(name string) ([]byte, error) {
421 const errFmt = "failed to load drop-in kubelet config file %s, error %v"
422
423 kubeletConfigFile, err := filepath.Abs(name)
424 if err != nil {
425 return nil, fmt.Errorf(errFmt, name, err)
426 }
427 loader, err := configfiles.NewFsLoader(&utilfs.DefaultFs{}, kubeletConfigFile)
428 if err != nil {
429 return nil, fmt.Errorf(errFmt, name, err)
430 }
431 return loader.LoadIntoJSON()
432 }
433
434
435
436 func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {
437
438 tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
439 if err != nil {
440 return nil, err
441 }
442
443 mounter := mount.New(s.ExperimentalMounterPath)
444 subpather := subpath.New(mounter)
445 hu := hostutil.NewHostUtil()
446 var pluginRunner = exec.New()
447
448 plugins, err := ProbeVolumePlugins(featureGate)
449 if err != nil {
450 return nil, err
451 }
452 tp := oteltrace.NewNoopTracerProvider()
453 if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
454 tp, err = newTracerProvider(s)
455 if err != nil {
456 return nil, err
457 }
458 }
459 return &kubelet.Dependencies{
460 Auth: nil,
461 CAdvisorInterface: nil,
462 Cloud: nil,
463 ContainerManager: nil,
464 KubeClient: nil,
465 HeartbeatClient: nil,
466 EventClient: nil,
467 TracerProvider: tp,
468 HostUtil: hu,
469 Mounter: mounter,
470 Subpather: subpather,
471 OOMAdjuster: oom.NewOOMAdjuster(),
472 OSInterface: kubecontainer.RealOS{},
473 VolumePlugins: plugins,
474 DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
475 TLSOptions: tlsOptions}, nil
476 }
477
478
479
480
481
482 func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
483
484 klog.InfoS("Kubelet version", "kubeletVersion", version.Get())
485
486 klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
487
488 if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
489 return fmt.Errorf("failed OS init: %w", err)
490 }
491 if err := run(ctx, s, kubeDeps, featureGate); err != nil {
492 return fmt.Errorf("failed to run Kubelet: %w", err)
493 }
494 return nil
495 }
496
497 func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfiguration) error {
498 scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
499 if err != nil {
500 return err
501 }
502 versioned := kubeletconfigv1beta1.KubeletConfiguration{}
503 if err := scheme.Convert(kc, &versioned, nil); err != nil {
504 return err
505 }
506 cz.Set(versioned)
507 return nil
508 }
509
510 func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
511 cz, err := configz.New("kubeletconfig")
512 if err != nil {
513 klog.ErrorS(err, "Failed to register configz")
514 return err
515 }
516 if err := setConfigz(cz, kc); err != nil {
517 klog.ErrorS(err, "Failed to register config")
518 return err
519 }
520 return nil
521 }
522
523
524 func makeEventRecorder(ctx context.Context, kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
525 if kubeDeps.Recorder != nil {
526 return
527 }
528 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
529 kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
530 eventBroadcaster.StartStructuredLogging(3)
531 if kubeDeps.EventClient != nil {
532 klog.V(4).InfoS("Sending events to api server")
533 eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
534 } else {
535 klog.InfoS("No api server defined - no events will be sent to API server")
536 }
537 }
538
539 func getReservedCPUs(machineInfo *cadvisorapi.MachineInfo, cpus string) (cpuset.CPUSet, error) {
540 emptyCPUSet := cpuset.New()
541
542 if cpus == "" {
543 return emptyCPUSet, nil
544 }
545
546 topo, err := topology.Discover(machineInfo)
547 if err != nil {
548 return emptyCPUSet, fmt.Errorf("unable to discover CPU topology info: %s", err)
549 }
550 reservedCPUSet, err := cpuset.Parse(cpus)
551 if err != nil {
552 return emptyCPUSet, fmt.Errorf("unable to parse reserved-cpus list: %s", err)
553 }
554 allCPUSet := topo.CPUDetails.CPUs()
555 if !reservedCPUSet.IsSubsetOf(allCPUSet) {
556 return emptyCPUSet, fmt.Errorf("reserved-cpus: %s is not a subset of online-cpus: %s", cpus, allCPUSet.String())
557 }
558 return reservedCPUSet, nil
559 }
560
561 func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
562
563 err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
564 if err != nil {
565 return err
566 }
567
568 if err := options.ValidateKubeletServer(s); err != nil {
569 return err
570 }
571
572
573 if utilfeature.DefaultFeatureGate.Enabled(features.MemoryQoS) &&
574 !isCgroup2UnifiedMode() {
575 klog.InfoS("Warning: MemoryQoS feature only works with cgroups v2 on Linux, but enabled with cgroups v1")
576 }
577
578 if s.ExitOnLockContention && s.LockFilePath == "" {
579 return errors.New("cannot exit on lock file contention: no lock file specified")
580 }
581 done := make(chan struct{})
582 if s.LockFilePath != "" {
583 klog.InfoS("Acquiring file lock", "path", s.LockFilePath)
584 if err := flock.Acquire(s.LockFilePath); err != nil {
585 return fmt.Errorf("unable to acquire file lock on %q: %w", s.LockFilePath, err)
586 }
587 if s.ExitOnLockContention {
588 klog.InfoS("Watching for inotify events", "path", s.LockFilePath)
589 if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
590 return err
591 }
592 }
593 }
594
595
596 err = initConfigz(&s.KubeletConfiguration)
597 if err != nil {
598 klog.ErrorS(err, "Failed to register kubelet configuration with configz")
599 }
600
601 if len(s.ShowHiddenMetricsForVersion) > 0 {
602 metrics.SetShowHidden()
603 }
604
605
606 standaloneMode := true
607 if len(s.KubeConfig) > 0 {
608 standaloneMode = false
609 }
610
611 if kubeDeps == nil {
612 kubeDeps, err = UnsecuredDependencies(s, featureGate)
613 if err != nil {
614 return err
615 }
616 }
617
618 if kubeDeps.Cloud == nil {
619 if !cloudprovider.IsExternal(s.CloudProvider) {
620 cloudprovider.DeprecationWarningForProvider(s.CloudProvider)
621 cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
622 if err != nil {
623 return err
624 }
625 if cloud != nil {
626 klog.V(2).InfoS("Successfully initialized cloud provider", "cloudProvider", s.CloudProvider, "cloudConfigFile", s.CloudConfigFile)
627 }
628 kubeDeps.Cloud = cloud
629 }
630 }
631
632 hostName, err := nodeutil.GetHostname(s.HostnameOverride)
633 if err != nil {
634 return err
635 }
636 nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
637 if err != nil {
638 return err
639 }
640
641
642 switch {
643 case standaloneMode:
644 kubeDeps.KubeClient = nil
645 kubeDeps.EventClient = nil
646 kubeDeps.HeartbeatClient = nil
647 klog.InfoS("Standalone mode, no API client")
648
649 case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
650 clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, kubeDeps.TracerProvider, nodeName)
651 if err != nil {
652 return err
653 }
654 if onHeartbeatFailure == nil {
655 return errors.New("onHeartbeatFailure must be a valid function other than nil")
656 }
657 kubeDeps.OnHeartbeatFailure = onHeartbeatFailure
658
659 kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
660 if err != nil {
661 return fmt.Errorf("failed to initialize kubelet client: %w", err)
662 }
663
664
665 eventClientConfig := *clientConfig
666 eventClientConfig.QPS = float32(s.EventRecordQPS)
667 eventClientConfig.Burst = int(s.EventBurst)
668 kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
669 if err != nil {
670 return fmt.Errorf("failed to initialize kubelet event client: %w", err)
671 }
672
673
674 heartbeatClientConfig := *clientConfig
675 heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
676
677 leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
678 if heartbeatClientConfig.Timeout > leaseTimeout {
679 heartbeatClientConfig.Timeout = leaseTimeout
680 }
681
682 heartbeatClientConfig.QPS = float32(-1)
683 kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
684 if err != nil {
685 return fmt.Errorf("failed to initialize kubelet heartbeat client: %w", err)
686 }
687 }
688
689 if kubeDeps.Auth == nil {
690 auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
691 if err != nil {
692 return err
693 }
694 kubeDeps.Auth = auth
695 runAuthenticatorCAReload(ctx.Done())
696 }
697
698 if err := kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps); err != nil {
699 return err
700 }
701
702
703 if utilfeature.DefaultFeatureGate.Enabled(features.KubeletCgroupDriverFromCRI) {
704 if err := getCgroupDriverFromCRI(ctx, s, kubeDeps); err != nil {
705 return err
706 }
707 }
708
709 var cgroupRoots []string
710 nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
711 cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
712 kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
713 if err != nil {
714 klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)
715 } else if kubeletCgroup != "" {
716 cgroupRoots = append(cgroupRoots, kubeletCgroup)
717 }
718
719 if s.RuntimeCgroups != "" {
720
721 cgroupRoots = append(cgroupRoots, s.RuntimeCgroups)
722 }
723
724 if s.SystemCgroups != "" {
725
726 cgroupRoots = append(cgroupRoots, s.SystemCgroups)
727 }
728
729 if kubeDeps.CAdvisorInterface == nil {
730 imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntimeEndpoint)
731 kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntimeEndpoint), s.LocalStorageCapacityIsolation)
732 if err != nil {
733 return err
734 }
735 }
736
737
738 makeEventRecorder(ctx, kubeDeps, nodeName)
739
740 if kubeDeps.ContainerManager == nil {
741 if s.CgroupsPerQOS && s.CgroupRoot == "" {
742 klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
743 s.CgroupRoot = "/"
744 }
745
746 machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
747 if err != nil {
748 return err
749 }
750 reservedSystemCPUs, err := getReservedCPUs(machineInfo, s.ReservedSystemCPUs)
751 if err != nil {
752 return err
753 }
754 if reservedSystemCPUs.Size() > 0 {
755
756 klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReserved", s.KubeReserved, "systemReserved", s.SystemReserved)
757 if s.KubeReserved != nil {
758 delete(s.KubeReserved, "cpu")
759 }
760 if s.SystemReserved == nil {
761 s.SystemReserved = make(map[string]string)
762 }
763 s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
764 klog.InfoS("After cpu setting is overwritten", "kubeReserved", s.KubeReserved, "systemReserved", s.SystemReserved)
765 }
766
767 kubeReserved, err := parseResourceList(s.KubeReserved)
768 if err != nil {
769 return fmt.Errorf("--kube-reserved value failed to parse: %w", err)
770 }
771 systemReserved, err := parseResourceList(s.SystemReserved)
772 if err != nil {
773 return fmt.Errorf("--system-reserved value failed to parse: %w", err)
774 }
775 var hardEvictionThresholds []evictionapi.Threshold
776
777 if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
778 hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
779 if err != nil {
780 return err
781 }
782 }
783 experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
784 if err != nil {
785 return fmt.Errorf("--qos-reserved value failed to parse: %w", err)
786 }
787
788 var cpuManagerPolicyOptions map[string]string
789 if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
790 cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
791 } else if s.CPUManagerPolicyOptions != nil {
792 return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
793 s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
794 }
795
796 var topologyManagerPolicyOptions map[string]string
797 if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManagerPolicyOptions) {
798 topologyManagerPolicyOptions = s.TopologyManagerPolicyOptions
799 } else if s.TopologyManagerPolicyOptions != nil {
800 return fmt.Errorf("topology manager policy options %v require feature gates %q enabled",
801 s.TopologyManagerPolicyOptions, features.TopologyManagerPolicyOptions)
802 }
803 if utilfeature.DefaultFeatureGate.Enabled(features.NodeSwap) {
804 if !isCgroup2UnifiedMode() && s.MemorySwap.SwapBehavior == kubelettypes.LimitedSwap {
805
806 return fmt.Errorf("swap feature is enabled and LimitedSwap but it is only supported with cgroupv2")
807 }
808 if !s.FailSwapOn && s.MemorySwap.SwapBehavior == "" {
809
810 klog.InfoS("NoSwap is set due to memorySwapBehavior not specified", "memorySwapBehavior", s.MemorySwap.SwapBehavior, "FailSwapOn", s.FailSwapOn)
811 }
812 }
813
814 kubeDeps.ContainerManager, err = cm.NewContainerManager(
815 kubeDeps.Mounter,
816 kubeDeps.CAdvisorInterface,
817 cm.NodeConfig{
818 NodeName: nodeName,
819 RuntimeCgroupsName: s.RuntimeCgroups,
820 SystemCgroupsName: s.SystemCgroups,
821 KubeletCgroupsName: s.KubeletCgroups,
822 KubeletOOMScoreAdj: s.OOMScoreAdj,
823 CgroupsPerQOS: s.CgroupsPerQOS,
824 CgroupRoot: s.CgroupRoot,
825 CgroupDriver: s.CgroupDriver,
826 KubeletRootDir: s.RootDirectory,
827 ProtectKernelDefaults: s.ProtectKernelDefaults,
828 NodeAllocatableConfig: cm.NodeAllocatableConfig{
829 KubeReservedCgroupName: s.KubeReservedCgroup,
830 SystemReservedCgroupName: s.SystemReservedCgroup,
831 EnforceNodeAllocatable: sets.New(s.EnforceNodeAllocatable...),
832 KubeReserved: kubeReserved,
833 SystemReserved: systemReserved,
834 ReservedSystemCPUs: reservedSystemCPUs,
835 HardEvictionThresholds: hardEvictionThresholds,
836 },
837 QOSReserved: *experimentalQOSReserved,
838 CPUManagerPolicy: s.CPUManagerPolicy,
839 CPUManagerPolicyOptions: cpuManagerPolicyOptions,
840 CPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
841 ExperimentalMemoryManagerPolicy: s.MemoryManagerPolicy,
842 ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
843 PodPidsLimit: s.PodPidsLimit,
844 EnforceCPULimits: s.CPUCFSQuota,
845 CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
846 TopologyManagerPolicy: s.TopologyManagerPolicy,
847 TopologyManagerScope: s.TopologyManagerScope,
848 TopologyManagerPolicyOptions: topologyManagerPolicyOptions,
849 },
850 s.FailSwapOn,
851 kubeDeps.Recorder,
852 kubeDeps.KubeClient,
853 )
854
855 if err != nil {
856 return err
857 }
858 }
859
860 if kubeDeps.PodStartupLatencyTracker == nil {
861 kubeDeps.PodStartupLatencyTracker = kubeletutil.NewPodStartupLatencyTracker()
862 }
863
864 if kubeDeps.NodeStartupLatencyTracker == nil {
865 kubeDeps.NodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker()
866 }
867
868
869 oomAdjuster := kubeDeps.OOMAdjuster
870 if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
871 klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
872 }
873
874 if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
875 return err
876 }
877
878 if s.HealthzPort > 0 {
879 mux := http.NewServeMux()
880 healthz.InstallHandler(mux)
881 go wait.Until(func() {
882 err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
883 if err != nil {
884 klog.ErrorS(err, "Failed to start healthz server")
885 }
886 }, 5*time.Second, wait.NeverStop)
887 }
888
889 if s.RunOnce {
890 return nil
891 }
892
893
894 go daemon.SdNotify(false, "READY=1")
895
896 select {
897 case <-done:
898 break
899 case <-ctx.Done():
900 break
901 }
902
903 return nil
904 }
905
906
907
908 func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, tp oteltrace.TracerProvider, nodeName types.NodeName) (*restclient.Config, func(), error) {
909 if s.RotateCertificates {
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927 klog.InfoS("Client rotation is on, will bootstrap in background")
928 certConfig, clientConfig, err := bootstrap.LoadClientConfig(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory)
929 if err != nil {
930 return nil, nil, err
931 }
932
933
934 setContentTypeForClient(certConfig, s.ContentType)
935
936 kubeClientConfigOverrides(s, clientConfig)
937
938 clientCertificateManager, err := buildClientCertificateManager(certConfig, clientConfig, s.CertDirectory, nodeName)
939 if err != nil {
940 return nil, nil, err
941 }
942
943 legacyregistry.RawMustRegister(metrics.NewGaugeFunc(
944 &metrics.GaugeOpts{
945 Subsystem: kubeletmetrics.KubeletSubsystem,
946 Name: "certificate_manager_client_ttl_seconds",
947 Help: "Gauge of the TTL (time-to-live) of the Kubelet's client certificate. " +
948 "The value is in seconds until certificate expiry (negative if already expired). " +
949 "If client certificate is invalid or unused, the value will be +INF.",
950 StabilityLevel: metrics.ALPHA,
951 },
952 func() float64 {
953 if c := clientCertificateManager.Current(); c != nil && c.Leaf != nil {
954 return math.Trunc(time.Until(c.Leaf.NotAfter).Seconds())
955 }
956 return math.Inf(1)
957 },
958 ))
959
960
961 transportConfig := restclient.AnonymousClientConfig(clientConfig)
962
963
964
965
966 closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, transportConfig, clientCertificateManager, 5*time.Minute)
967 if err != nil {
968 return nil, nil, err
969 }
970 var onHeartbeatFailure func()
971
972
973
974
975
976
977 if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 {
978 klog.InfoS("HTTP2 has been explicitly disabled, Kubelet will forcefully close active connections on heartbeat failures")
979 onHeartbeatFailure = closeAllConns
980 } else {
981 onHeartbeatFailure = func() { utilnet.CloseIdleConnectionsFor(transportConfig.Transport) }
982 }
983
984 klog.V(2).InfoS("Starting client certificate rotation")
985 clientCertificateManager.Start()
986
987 return transportConfig, onHeartbeatFailure, nil
988 }
989
990 if len(s.BootstrapKubeconfig) > 0 {
991 if err := bootstrap.LoadClientCert(ctx, s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
992 return nil, nil, err
993 }
994 }
995
996 clientConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
997 &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig},
998 &clientcmd.ConfigOverrides{},
999 ).ClientConfig()
1000 if err != nil {
1001 return nil, nil, fmt.Errorf("invalid kubeconfig: %w", err)
1002 }
1003
1004 kubeClientConfigOverrides(s, clientConfig)
1005
1006
1007
1008
1009
1010
1011 var onHeartbeatFailure func()
1012 if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 {
1013 klog.InfoS("HTTP2 has been explicitly disabled, updating Kubelet client Dialer to forcefully close active connections on heartbeat failures")
1014 onHeartbeatFailure, err = updateDialer(clientConfig)
1015 if err != nil {
1016 return nil, nil, err
1017 }
1018 } else {
1019 onHeartbeatFailure = func() {
1020 utilnet.CloseIdleConnectionsFor(clientConfig.Transport)
1021 }
1022 }
1023 if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
1024 clientConfig.Wrap(tracing.WrapperFor(tp))
1025 }
1026 return clientConfig, onHeartbeatFailure, nil
1027 }
1028
1029
1030 func updateDialer(clientConfig *restclient.Config) (func(), error) {
1031 if clientConfig.Transport != nil || clientConfig.Dial != nil {
1032 return nil, fmt.Errorf("there is already a transport or dialer configured")
1033 }
1034 d := connrotation.NewDialer((&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext)
1035 clientConfig.Dial = d.DialContext
1036 return d.CloseAll, nil
1037 }
1038
1039
1040
1041
1042 func buildClientCertificateManager(certConfig, clientConfig *restclient.Config, certDir string, nodeName types.NodeName) (certificate.Manager, error) {
1043 newClientsetFn := func(current *tls.Certificate) (clientset.Interface, error) {
1044
1045
1046
1047
1048 config := certConfig
1049 if current != nil {
1050 config = clientConfig
1051 }
1052 return clientset.NewForConfig(config)
1053 }
1054
1055 return kubeletcertificate.NewKubeletClientCertificateManager(
1056 certDir,
1057 nodeName,
1058
1059
1060
1061
1062 clientConfig.CertData,
1063 clientConfig.KeyData,
1064
1065 clientConfig.CertFile,
1066 clientConfig.KeyFile,
1067 newClientsetFn,
1068 )
1069 }
1070
1071 func kubeClientConfigOverrides(s *options.KubeletServer, clientConfig *restclient.Config) {
1072 setContentTypeForClient(clientConfig, s.ContentType)
1073
1074 clientConfig.QPS = float32(s.KubeAPIQPS)
1075 clientConfig.Burst = int(s.KubeAPIBurst)
1076 }
1077
1078
1079
1080 func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName, error) {
1081 if cloud == nil {
1082 return types.NodeName(hostname), nil
1083 }
1084
1085 instances, ok := cloud.Instances()
1086 if !ok {
1087 return "", fmt.Errorf("failed to get instances from cloud provider")
1088 }
1089
1090 nodeName, err := instances.CurrentNodeName(context.TODO(), hostname)
1091 if err != nil {
1092 return "", fmt.Errorf("error fetching current node name from cloud provider: %w", err)
1093 }
1094
1095 klog.V(2).InfoS("Cloud provider determined current node", "nodeName", klog.KRef("", string(nodeName)))
1096
1097 return nodeName, nil
1098 }
1099
1100
1101
1102 func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
1103 if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
1104 kc.TLSCertFile = filepath.Join(kf.CertDirectory, "kubelet.crt")
1105 kc.TLSPrivateKeyFile = filepath.Join(kf.CertDirectory, "kubelet.key")
1106
1107 canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
1108 if err != nil {
1109 return nil, err
1110 }
1111 if !canReadCertAndKey {
1112 hostName, err := nodeutil.GetHostname(kf.HostnameOverride)
1113 if err != nil {
1114 return nil, err
1115 }
1116 cert, key, err := certutil.GenerateSelfSignedCertKey(hostName, nil, nil)
1117 if err != nil {
1118 return nil, fmt.Errorf("unable to generate self signed cert: %w", err)
1119 }
1120
1121 if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
1122 return nil, err
1123 }
1124
1125 if err := keyutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
1126 return nil, err
1127 }
1128
1129 klog.V(4).InfoS("Using self-signed cert", "TLSCertFile", kc.TLSCertFile, "TLSPrivateKeyFile", kc.TLSPrivateKeyFile)
1130 }
1131 }
1132
1133 tlsCipherSuites, err := cliflag.TLSCipherSuites(kc.TLSCipherSuites)
1134 if err != nil {
1135 return nil, err
1136 }
1137
1138 if len(tlsCipherSuites) > 0 {
1139 insecureCiphers := cliflag.InsecureTLSCiphers()
1140 for i := 0; i < len(tlsCipherSuites); i++ {
1141 for cipherName, cipherID := range insecureCiphers {
1142 if tlsCipherSuites[i] == cipherID {
1143 klog.InfoS("Use of insecure cipher detected.", "cipher", cipherName)
1144 }
1145 }
1146 }
1147 }
1148
1149 minTLSVersion, err := cliflag.TLSVersion(kc.TLSMinVersion)
1150 if err != nil {
1151 return nil, err
1152 }
1153
1154 if minTLSVersion == tls.VersionTLS13 {
1155 if len(tlsCipherSuites) != 0 {
1156 klog.InfoS("Warning: TLS 1.3 cipher suites are not configurable, ignoring --tls-cipher-suites")
1157 }
1158 }
1159
1160 tlsOptions := &server.TLSOptions{
1161 Config: &tls.Config{
1162 MinVersion: minTLSVersion,
1163 CipherSuites: tlsCipherSuites,
1164 },
1165 CertFile: kc.TLSCertFile,
1166 KeyFile: kc.TLSPrivateKeyFile,
1167 }
1168
1169 if len(kc.Authentication.X509.ClientCAFile) > 0 {
1170 clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
1171 if err != nil {
1172 return nil, fmt.Errorf("unable to load client CA file %s: %w", kc.Authentication.X509.ClientCAFile, err)
1173 }
1174
1175 tlsOptions.Config.ClientCAs = clientCAs
1176
1177 tlsOptions.Config.ClientAuth = tls.RequestClientCert
1178 }
1179
1180 return tlsOptions, nil
1181 }
1182
1183
1184
1185 func setContentTypeForClient(cfg *restclient.Config, contentType string) {
1186 if len(contentType) == 0 {
1187 return
1188 }
1189 cfg.ContentType = contentType
1190 switch contentType {
1191 case runtime.ContentTypeProtobuf:
1192 cfg.AcceptContentTypes = strings.Join([]string{runtime.ContentTypeProtobuf, runtime.ContentTypeJSON}, ",")
1193 default:
1194
1195 }
1196 }
1197
1198
1199
1200
1201
1202
1203
1204
1205 func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
1206 hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
1207 if err != nil {
1208 return err
1209 }
1210
1211 nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
1212 if err != nil {
1213 return err
1214 }
1215 hostnameOverridden := len(kubeServer.HostnameOverride) > 0
1216
1217 makeEventRecorder(context.TODO(), kubeDeps, nodeName)
1218
1219 nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider)
1220 if err != nil {
1221 return fmt.Errorf("bad --node-ip %q: %v", kubeServer.NodeIP, err)
1222 }
1223
1224 capabilities.Initialize(capabilities.Capabilities{
1225 AllowPrivileged: true,
1226 })
1227
1228 credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
1229 klog.V(2).InfoS("Using root directory", "path", kubeServer.RootDirectory)
1230
1231 if kubeDeps.OSInterface == nil {
1232 kubeDeps.OSInterface = kubecontainer.RealOS{}
1233 }
1234
1235 k, err := createAndInitKubelet(kubeServer,
1236 kubeDeps,
1237 hostname,
1238 hostnameOverridden,
1239 nodeName,
1240 nodeIPs)
1241 if err != nil {
1242 return fmt.Errorf("failed to create kubelet: %w", err)
1243 }
1244
1245
1246
1247 if kubeDeps.PodConfig == nil {
1248 return fmt.Errorf("failed to create kubelet, pod source config was nil")
1249 }
1250 podCfg := kubeDeps.PodConfig
1251
1252 if err := rlimit.SetNumFiles(uint64(kubeServer.MaxOpenFiles)); err != nil {
1253 klog.ErrorS(err, "Failed to set rlimit on max file handles")
1254 }
1255
1256
1257 if runOnce {
1258 if _, err := k.RunOnce(podCfg.Updates()); err != nil {
1259 return fmt.Errorf("runonce failed: %w", err)
1260 }
1261 klog.InfoS("Started kubelet as runonce")
1262 } else {
1263 startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
1264 klog.InfoS("Started kubelet")
1265 }
1266 return nil
1267 }
1268
1269 func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
1270
1271 go k.Run(podCfg.Updates())
1272
1273
1274 if enableServer {
1275 go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
1276 }
1277 if kubeCfg.ReadOnlyPort > 0 {
1278 go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
1279 }
1280 go k.ListenAndServePodResources()
1281 }
1282
1283 func createAndInitKubelet(kubeServer *options.KubeletServer,
1284 kubeDeps *kubelet.Dependencies,
1285 hostname string,
1286 hostnameOverridden bool,
1287 nodeName types.NodeName,
1288 nodeIPs []net.IP) (k kubelet.Bootstrap, err error) {
1289
1290
1291
1292 k, err = kubelet.NewMainKubelet(&kubeServer.KubeletConfiguration,
1293 kubeDeps,
1294 &kubeServer.ContainerRuntimeOptions,
1295 hostname,
1296 hostnameOverridden,
1297 nodeName,
1298 nodeIPs,
1299 kubeServer.ProviderID,
1300 kubeServer.CloudProvider,
1301 kubeServer.CertDirectory,
1302 kubeServer.RootDirectory,
1303 kubeServer.PodLogsDir,
1304 kubeServer.ImageCredentialProviderConfigFile,
1305 kubeServer.ImageCredentialProviderBinDir,
1306 kubeServer.RegisterNode,
1307 kubeServer.RegisterWithTaints,
1308 kubeServer.AllowedUnsafeSysctls,
1309 kubeServer.ExperimentalMounterPath,
1310 kubeServer.KernelMemcgNotification,
1311 kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
1312 kubeServer.MinimumGCAge,
1313 kubeServer.MaxPerPodContainerCount,
1314 kubeServer.MaxContainerCount,
1315 kubeServer.RegisterSchedulable,
1316 kubeServer.KeepTerminatedPodVolumes,
1317 kubeServer.NodeLabels,
1318 kubeServer.NodeStatusMaxImages,
1319 kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault)
1320 if err != nil {
1321 return nil, err
1322 }
1323
1324 k.BirthCry()
1325
1326 k.StartGarbageCollection()
1327
1328 return k, nil
1329 }
1330
1331
1332
1333 func parseResourceList(m map[string]string) (v1.ResourceList, error) {
1334 if len(m) == 0 {
1335 return nil, nil
1336 }
1337 rl := make(v1.ResourceList)
1338 for k, v := range m {
1339 switch v1.ResourceName(k) {
1340
1341 case v1.ResourceCPU, v1.ResourceMemory, v1.ResourceEphemeralStorage, pidlimit.PIDs:
1342 q, err := resource.ParseQuantity(v)
1343 if err != nil {
1344 return nil, fmt.Errorf("failed to parse quantity %q for %q resource: %w", v, k, err)
1345 }
1346 if q.Sign() == -1 {
1347 return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
1348 }
1349 rl[v1.ResourceName(k)] = q
1350 default:
1351 return nil, fmt.Errorf("cannot reserve %q resource", k)
1352 }
1353 }
1354 return rl, nil
1355 }
1356
1357 func newTracerProvider(s *options.KubeletServer) (oteltrace.TracerProvider, error) {
1358 if s.KubeletConfiguration.Tracing == nil {
1359 return oteltrace.NewNoopTracerProvider(), nil
1360 }
1361 hostname, err := nodeutil.GetHostname(s.HostnameOverride)
1362 if err != nil {
1363 return nil, fmt.Errorf("could not determine hostname for tracer provider: %w", err)
1364 }
1365 resourceOpts := []otelsdkresource.Option{
1366 otelsdkresource.WithAttributes(
1367 semconv.ServiceNameKey.String(componentKubelet),
1368 semconv.HostNameKey.String(hostname),
1369 ),
1370 }
1371 tp, err := tracing.NewProvider(context.Background(), s.KubeletConfiguration.Tracing, []otlptracegrpc.Option{}, resourceOpts)
1372 if err != nil {
1373 return nil, fmt.Errorf("could not configure tracer provider: %w", err)
1374 }
1375 return tp, nil
1376 }
1377
1378 func getCgroupDriverFromCRI(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies) error {
1379 klog.V(4).InfoS("Getting CRI runtime configuration information")
1380
1381 var (
1382 runtimeConfig *runtimeapi.RuntimeConfigResponse
1383 err error
1384 )
1385
1386
1387 for i := 0; i < 3; i++ {
1388 runtimeConfig, err = kubeDeps.RemoteRuntimeService.RuntimeConfig(ctx)
1389 if err != nil {
1390 s, ok := status.FromError(err)
1391 if !ok || s.Code() != codes.Unimplemented {
1392
1393
1394
1395 time.Sleep(time.Second * 2)
1396 continue
1397 }
1398
1399 klog.InfoS("CRI implementation should be updated to support RuntimeConfig when KubeletCgroupDriverFromCRI feature gate has been enabled. Falling back to using cgroupDriver from kubelet config.")
1400 return nil
1401 }
1402 }
1403 if err != nil {
1404 return err
1405 }
1406
1407
1408
1409 linuxConfig := runtimeConfig.GetLinux()
1410 if linuxConfig == nil {
1411 return nil
1412 }
1413
1414 switch d := linuxConfig.GetCgroupDriver(); d {
1415 case runtimeapi.CgroupDriver_SYSTEMD:
1416 s.CgroupDriver = "systemd"
1417 case runtimeapi.CgroupDriver_CGROUPFS:
1418 s.CgroupDriver = "cgroupfs"
1419 default:
1420 return fmt.Errorf("runtime returned an unknown cgroup driver %d", d)
1421 }
1422 klog.InfoS("Using cgroup driver setting received from the CRI runtime", "cgroupDriver", s.CgroupDriver)
1423 return nil
1424 }
1425
View as plain text