...

Source file src/k8s.io/kubernetes/cmd/kubelet/app/server.go

Documentation: k8s.io/kubernetes/cmd/kubelet/app

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package app makes it easy to create a kubelet server for various contexts.
    18  package app
    19  
    20  import (
    21  	"context"
    22  	"crypto/tls"
    23  	"encoding/json"
    24  	"errors"
    25  	"fmt"
    26  	"io"
    27  	"io/fs"
    28  	"math"
    29  	"net"
    30  	"net/http"
    31  	"os"
    32  	"path/filepath"
    33  	"strconv"
    34  	"strings"
    35  	"time"
    36  
    37  	"github.com/coreos/go-systemd/v22/daemon"
    38  	jsonpatch "github.com/evanphx/json-patch"
    39  	"github.com/spf13/cobra"
    40  	"github.com/spf13/pflag"
    41  	"google.golang.org/grpc/codes"
    42  	"google.golang.org/grpc/status"
    43  	"k8s.io/klog/v2"
    44  	"k8s.io/mount-utils"
    45  
    46  	cadvisorapi "github.com/google/cadvisor/info/v1"
    47  	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    48  	otelsdkresource "go.opentelemetry.io/otel/sdk/resource"
    49  	semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
    50  	oteltrace "go.opentelemetry.io/otel/trace"
    51  	v1 "k8s.io/api/core/v1"
    52  	"k8s.io/apimachinery/pkg/api/resource"
    53  	"k8s.io/apimachinery/pkg/runtime"
    54  	"k8s.io/apimachinery/pkg/types"
    55  	utilnet "k8s.io/apimachinery/pkg/util/net"
    56  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    57  	"k8s.io/apimachinery/pkg/util/sets"
    58  	"k8s.io/apimachinery/pkg/util/validation/field"
    59  	"k8s.io/apimachinery/pkg/util/wait"
    60  	genericapiserver "k8s.io/apiserver/pkg/server"
    61  	"k8s.io/apiserver/pkg/server/healthz"
    62  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    63  	clientset "k8s.io/client-go/kubernetes"
    64  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    65  	restclient "k8s.io/client-go/rest"
    66  	"k8s.io/client-go/tools/clientcmd"
    67  	"k8s.io/client-go/tools/record"
    68  	certutil "k8s.io/client-go/util/cert"
    69  	"k8s.io/client-go/util/certificate"
    70  	"k8s.io/client-go/util/connrotation"
    71  	"k8s.io/client-go/util/keyutil"
    72  	cloudprovider "k8s.io/cloud-provider"
    73  	cliflag "k8s.io/component-base/cli/flag"
    74  	"k8s.io/component-base/configz"
    75  	"k8s.io/component-base/featuregate"
    76  	"k8s.io/component-base/logs"
    77  	logsapi "k8s.io/component-base/logs/api/v1"
    78  	"k8s.io/component-base/metrics"
    79  	"k8s.io/component-base/metrics/legacyregistry"
    80  	"k8s.io/component-base/tracing"
    81  	"k8s.io/component-base/version"
    82  	"k8s.io/component-base/version/verflag"
    83  	nodeutil "k8s.io/component-helpers/node/util"
    84  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    85  	kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
    86  	"k8s.io/kubernetes/cmd/kubelet/app/options"
    87  	"k8s.io/kubernetes/pkg/api/legacyscheme"
    88  	"k8s.io/kubernetes/pkg/capabilities"
    89  	"k8s.io/kubernetes/pkg/credentialprovider"
    90  	"k8s.io/kubernetes/pkg/features"
    91  	"k8s.io/kubernetes/pkg/kubelet"
    92  	kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
    93  	kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
    94  	kubeletconfigvalidation "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
    95  	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
    96  	kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
    97  	"k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap"
    98  	"k8s.io/kubernetes/pkg/kubelet/cm"
    99  	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
   100  	"k8s.io/kubernetes/pkg/kubelet/config"
   101  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
   102  	"k8s.io/kubernetes/pkg/kubelet/eviction"
   103  	evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
   104  	"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
   105  	kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
   106  	"k8s.io/kubernetes/pkg/kubelet/server"
   107  	"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
   108  	kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
   109  	kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
   110  	utilfs "k8s.io/kubernetes/pkg/util/filesystem"
   111  	"k8s.io/kubernetes/pkg/util/flock"
   112  	"k8s.io/kubernetes/pkg/util/oom"
   113  	"k8s.io/kubernetes/pkg/util/rlimit"
   114  	"k8s.io/kubernetes/pkg/volume/util/hostutil"
   115  	"k8s.io/kubernetes/pkg/volume/util/subpath"
   116  	"k8s.io/utils/cpuset"
   117  	"k8s.io/utils/exec"
   118  	netutils "k8s.io/utils/net"
   119  )
   120  
   121  func init() {
   122  	utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
   123  }
   124  
   125  const (
   126  	// Kubelet component name
   127  	componentKubelet = "kubelet"
   128  )
   129  
   130  // NewKubeletCommand creates a *cobra.Command object with default parameters
   131  func NewKubeletCommand() *cobra.Command {
   132  	cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
   133  	cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
   134  	kubeletFlags := options.NewKubeletFlags()
   135  
   136  	kubeletConfig, err := options.NewKubeletConfiguration()
   137  	// programmer error
   138  	if err != nil {
   139  		klog.ErrorS(err, "Failed to create a new kubelet configuration")
   140  		os.Exit(1)
   141  	}
   142  
   143  	cmd := &cobra.Command{
   144  		Use: componentKubelet,
   145  		Long: `The kubelet is the primary "node agent" that runs on each
   146  node. It can register the node with the apiserver using one of: the hostname; a flag to
   147  override the hostname; or specific logic for a cloud provider.
   148  
   149  The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
   150  that describes a pod. The kubelet takes a set of PodSpecs that are provided through
   151  various mechanisms (primarily through the apiserver) and ensures that the containers
   152  described in those PodSpecs are running and healthy. The kubelet doesn't manage
   153  containers which were not created by Kubernetes.
   154  
   155  Other than from an PodSpec from the apiserver, there are two ways that a container
   156  manifest can be provided to the Kubelet.
   157  
   158  File: Path passed as a flag on the command line. Files under this path will be monitored
   159  periodically for updates. The monitoring period is 20s by default and is configurable
   160  via a flag.
   161  
   162  HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
   163  is checked every 20 seconds (also configurable with a flag).`,
   164  		// The Kubelet has special flag parsing requirements to enforce flag precedence rules,
   165  		// so we do all our parsing manually in Run, below.
   166  		// DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
   167  		// `args` arg to Run, without Cobra's interference.
   168  		DisableFlagParsing: true,
   169  		SilenceUsage:       true,
   170  		RunE: func(cmd *cobra.Command, args []string) error {
   171  			// initial flag parse, since we disable cobra's flag parsing
   172  			if err := cleanFlagSet.Parse(args); err != nil {
   173  				return fmt.Errorf("failed to parse kubelet flag: %w", err)
   174  			}
   175  
   176  			// check if there are non-flag arguments in the command line
   177  			cmds := cleanFlagSet.Args()
   178  			if len(cmds) > 0 {
   179  				return fmt.Errorf("unknown command %+s", cmds[0])
   180  			}
   181  
   182  			// short-circuit on help
   183  			help, err := cleanFlagSet.GetBool("help")
   184  			if err != nil {
   185  				return errors.New(`"help" flag is non-bool, programmer error, please correct`)
   186  			}
   187  			if help {
   188  				return cmd.Help()
   189  			}
   190  
   191  			// short-circuit on verflag
   192  			verflag.PrintAndExitIfRequested()
   193  
   194  			// set feature gates from initial flags-based config
   195  			if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
   196  				return fmt.Errorf("failed to set feature gates from initial flags-based config: %w", err)
   197  			}
   198  
   199  			// validate the initial KubeletFlags
   200  			if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
   201  				return fmt.Errorf("failed to validate kubelet flags: %w", err)
   202  			}
   203  
   204  			if cleanFlagSet.Changed("pod-infra-container-image") {
   205  				klog.InfoS("--pod-infra-container-image will not be pruned by the image garbage collector in kubelet and should also be set in the remote runtime")
   206  				_ = cmd.Flags().MarkDeprecated("pod-infra-container-image", "--pod-infra-container-image will be removed in 1.30. Image garbage collector will get sandbox image information from CRI.")
   207  			}
   208  
   209  			// load kubelet config file, if provided
   210  			if len(kubeletFlags.KubeletConfigFile) > 0 {
   211  				kubeletConfig, err = loadConfigFile(kubeletFlags.KubeletConfigFile)
   212  				if err != nil {
   213  					return fmt.Errorf("failed to load kubelet config file, path: %s, error: %w", kubeletFlags.KubeletConfigFile, err)
   214  				}
   215  			}
   216  			// Merge the kubelet configurations if --config-dir is set
   217  			if len(kubeletFlags.KubeletDropinConfigDirectory) > 0 {
   218  				if err := mergeKubeletConfigurations(kubeletConfig, kubeletFlags.KubeletDropinConfigDirectory); err != nil {
   219  					return fmt.Errorf("failed to merge kubelet configs: %w", err)
   220  				}
   221  			}
   222  
   223  			if len(kubeletFlags.KubeletConfigFile) > 0 || len(kubeletFlags.KubeletDropinConfigDirectory) > 0 {
   224  				// We must enforce flag precedence by re-parsing the command line into the new object.
   225  				// This is necessary to preserve backwards-compatibility across binary upgrades.
   226  				// See issue #56171 for more details.
   227  				if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
   228  					return fmt.Errorf("failed to precedence kubeletConfigFlag: %w", err)
   229  				}
   230  				// update feature gates based on new config
   231  				if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
   232  					return fmt.Errorf("failed to set feature gates from initial flags-based config: %w", err)
   233  				}
   234  			}
   235  
   236  			// Config and flags parsed, now we can initialize logging.
   237  			logs.InitLogs()
   238  			if err := logsapi.ValidateAndApplyAsField(&kubeletConfig.Logging, utilfeature.DefaultFeatureGate, field.NewPath("logging")); err != nil {
   239  				return fmt.Errorf("initialize logging: %v", err)
   240  			}
   241  			cliflag.PrintFlags(cleanFlagSet)
   242  
   243  			// We always validate the local configuration (command line + config file).
   244  			// This is the default "last-known-good" config for dynamic config, and must always remain valid.
   245  			if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig, utilfeature.DefaultFeatureGate); err != nil {
   246  				return fmt.Errorf("failed to validate kubelet configuration, error: %w, path: %s", err, kubeletConfig)
   247  			}
   248  
   249  			if (kubeletConfig.KubeletCgroups != "" && kubeletConfig.KubeReservedCgroup != "") && (strings.Index(kubeletConfig.KubeletCgroups, kubeletConfig.KubeReservedCgroup) != 0) {
   250  				klog.InfoS("unsupported configuration:KubeletCgroups is not within KubeReservedCgroup")
   251  			}
   252  
   253  			// construct a KubeletServer from kubeletFlags and kubeletConfig
   254  			kubeletServer := &options.KubeletServer{
   255  				KubeletFlags:         *kubeletFlags,
   256  				KubeletConfiguration: *kubeletConfig,
   257  			}
   258  
   259  			// use kubeletServer to construct the default KubeletDeps
   260  			kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
   261  			if err != nil {
   262  				return fmt.Errorf("failed to construct kubelet dependencies: %w", err)
   263  			}
   264  
   265  			if err := checkPermissions(); err != nil {
   266  				klog.ErrorS(err, "kubelet running with insufficient permissions")
   267  			}
   268  
   269  			// make the kubelet's config safe for logging
   270  			config := kubeletServer.KubeletConfiguration.DeepCopy()
   271  			for k := range config.StaticPodURLHeader {
   272  				config.StaticPodURLHeader[k] = []string{"<masked>"}
   273  			}
   274  			// log the kubelet's config for inspection
   275  			klog.V(5).InfoS("KubeletConfiguration", "configuration", klog.Format(config))
   276  
   277  			// set up signal context for kubelet shutdown
   278  			ctx := genericapiserver.SetupSignalContext()
   279  
   280  			utilfeature.DefaultMutableFeatureGate.AddMetrics()
   281  			// run the kubelet
   282  			return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
   283  		},
   284  	}
   285  
   286  	// keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
   287  	kubeletFlags.AddFlags(cleanFlagSet)
   288  	options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
   289  	options.AddGlobalFlags(cleanFlagSet)
   290  	cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))
   291  
   292  	// ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
   293  	const usageFmt = "Usage:\n  %s\n\nFlags:\n%s"
   294  	cmd.SetUsageFunc(func(cmd *cobra.Command) error {
   295  		fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
   296  		return nil
   297  	})
   298  	cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
   299  		fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
   300  	})
   301  
   302  	return cmd
   303  }
   304  
   305  // mergeKubeletConfigurations merges the provided drop-in configurations with the base kubelet configuration.
   306  // The drop-in configurations are processed in lexical order based on the file names. This means that the
   307  // configurations in files with lower numeric prefixes are applied first, followed by higher numeric prefixes.
   308  // For example, if the drop-in directory contains files named "10-config.conf" and "20-config.conf",
   309  // the configurations in "10-config.conf" will be applied first, and then the configurations in "20-config.conf" will be applied,
   310  // potentially overriding the previous values.
   311  func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConfiguration, kubeletDropInConfigDir string) error {
   312  	const dropinFileExtension = ".conf"
   313  	baseKubeletConfigJSON, err := json.Marshal(kubeletConfig)
   314  	if err != nil {
   315  		return fmt.Errorf("failed to marshal base config: %w", err)
   316  	}
   317  	// Walk through the drop-in directory and update the configuration for each file
   318  	if err := filepath.WalkDir(kubeletDropInConfigDir, func(path string, info fs.DirEntry, err error) error {
   319  		if err != nil {
   320  			return err
   321  		}
   322  		if !info.IsDir() && filepath.Ext(info.Name()) == dropinFileExtension {
   323  			dropinConfigJSON, err := loadDropinConfigFileIntoJSON(path)
   324  			if err != nil {
   325  				return fmt.Errorf("failed to load kubelet dropin file, path: %s, error: %w", path, err)
   326  			}
   327  			mergedConfigJSON, err := jsonpatch.MergePatch(baseKubeletConfigJSON, dropinConfigJSON)
   328  			if err != nil {
   329  				return fmt.Errorf("failed to merge drop-in and current config: %w", err)
   330  			}
   331  			baseKubeletConfigJSON = mergedConfigJSON
   332  		}
   333  		return nil
   334  	}); err != nil {
   335  		return fmt.Errorf("failed to walk through kubelet dropin directory %q: %w", kubeletDropInConfigDir, err)
   336  	}
   337  
   338  	if err := json.Unmarshal(baseKubeletConfigJSON, kubeletConfig); err != nil {
   339  		return fmt.Errorf("failed to unmarshal merged JSON into kubelet configuration: %w", err)
   340  	}
   341  	return nil
   342  }
   343  
   344  // newFlagSetWithGlobals constructs a new pflag.FlagSet with global flags registered
   345  // on it.
   346  func newFlagSetWithGlobals() *pflag.FlagSet {
   347  	fs := pflag.NewFlagSet("", pflag.ExitOnError)
   348  	// set the normalize func, similar to k8s.io/component-base/cli//flags.go:InitFlags
   349  	fs.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
   350  	// explicitly add flags from libs that register global flags
   351  	options.AddGlobalFlags(fs)
   352  	return fs
   353  }
   354  
   355  // newFakeFlagSet constructs a pflag.FlagSet with the same flags as fs, but where
   356  // all values have noop Set implementations
   357  func newFakeFlagSet(fs *pflag.FlagSet) *pflag.FlagSet {
   358  	ret := pflag.NewFlagSet("", pflag.ExitOnError)
   359  	ret.SetNormalizeFunc(fs.GetNormalizeFunc())
   360  	fs.VisitAll(func(f *pflag.Flag) {
   361  		ret.VarP(cliflag.NoOp{}, f.Name, f.Shorthand, f.Usage)
   362  	})
   363  	return ret
   364  }
   365  
   366  // kubeletConfigFlagPrecedence re-parses flags over the KubeletConfiguration object.
   367  // We must enforce flag precedence by re-parsing the command line into the new object.
   368  // This is necessary to preserve backwards-compatibility across binary upgrades.
   369  // See issue #56171 for more details.
   370  func kubeletConfigFlagPrecedence(kc *kubeletconfiginternal.KubeletConfiguration, args []string) error {
   371  	// We use a throwaway kubeletFlags and a fake global flagset to avoid double-parses,
   372  	// as some Set implementations accumulate values from multiple flag invocations.
   373  	fs := newFakeFlagSet(newFlagSetWithGlobals())
   374  	// register throwaway KubeletFlags
   375  	options.NewKubeletFlags().AddFlags(fs)
   376  	// register new KubeletConfiguration
   377  	options.AddKubeletConfigFlags(fs, kc)
   378  	// Remember original feature gates, so we can merge with flag gates later
   379  	original := kc.FeatureGates
   380  	// avoid duplicate printing the flag deprecation warnings during re-parsing
   381  	fs.SetOutput(io.Discard)
   382  	// re-parse flags
   383  	if err := fs.Parse(args); err != nil {
   384  		return err
   385  	}
   386  	// Add back feature gates that were set in the original kc, but not in flags
   387  	for k, v := range original {
   388  		if _, ok := kc.FeatureGates[k]; !ok {
   389  			kc.FeatureGates[k] = v
   390  		}
   391  	}
   392  	return nil
   393  }
   394  
   395  func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, error) {
   396  	const errFmt = "failed to load Kubelet config file %s, error %v"
   397  	// compute absolute path based on current working dir
   398  	kubeletConfigFile, err := filepath.Abs(name)
   399  	if err != nil {
   400  		return nil, fmt.Errorf(errFmt, name, err)
   401  	}
   402  	loader, err := configfiles.NewFsLoader(&utilfs.DefaultFs{}, kubeletConfigFile)
   403  	if err != nil {
   404  		return nil, fmt.Errorf(errFmt, name, err)
   405  	}
   406  	kc, err := loader.Load()
   407  	if err != nil {
   408  		return nil, fmt.Errorf(errFmt, name, err)
   409  	}
   410  
   411  	// EvictionHard may be nil if it was not set in kubelet's config file.
   412  	// EvictionHard can have OS-specific fields, which is why there's no default value for it.
   413  	// See: https://github.com/kubernetes/kubernetes/pull/110263
   414  	if kc.EvictionHard == nil {
   415  		kc.EvictionHard = eviction.DefaultEvictionHard
   416  	}
   417  	return kc, err
   418  }
   419  
   420  func loadDropinConfigFileIntoJSON(name string) ([]byte, error) {
   421  	const errFmt = "failed to load drop-in kubelet config file %s, error %v"
   422  	// compute absolute path based on current working dir
   423  	kubeletConfigFile, err := filepath.Abs(name)
   424  	if err != nil {
   425  		return nil, fmt.Errorf(errFmt, name, err)
   426  	}
   427  	loader, err := configfiles.NewFsLoader(&utilfs.DefaultFs{}, kubeletConfigFile)
   428  	if err != nil {
   429  		return nil, fmt.Errorf(errFmt, name, err)
   430  	}
   431  	return loader.LoadIntoJSON()
   432  }
   433  
   434  // UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup
   435  // is not valid.  It will not start any background processes, and does not include authentication/authorization
   436  func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {
   437  	// Initialize the TLS Options
   438  	tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
   439  	if err != nil {
   440  		return nil, err
   441  	}
   442  
   443  	mounter := mount.New(s.ExperimentalMounterPath)
   444  	subpather := subpath.New(mounter)
   445  	hu := hostutil.NewHostUtil()
   446  	var pluginRunner = exec.New()
   447  
   448  	plugins, err := ProbeVolumePlugins(featureGate)
   449  	if err != nil {
   450  		return nil, err
   451  	}
   452  	tp := oteltrace.NewNoopTracerProvider()
   453  	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
   454  		tp, err = newTracerProvider(s)
   455  		if err != nil {
   456  			return nil, err
   457  		}
   458  	}
   459  	return &kubelet.Dependencies{
   460  		Auth:                nil, // default does not enforce auth[nz]
   461  		CAdvisorInterface:   nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
   462  		Cloud:               nil, // cloud provider might start background processes
   463  		ContainerManager:    nil,
   464  		KubeClient:          nil,
   465  		HeartbeatClient:     nil,
   466  		EventClient:         nil,
   467  		TracerProvider:      tp,
   468  		HostUtil:            hu,
   469  		Mounter:             mounter,
   470  		Subpather:           subpather,
   471  		OOMAdjuster:         oom.NewOOMAdjuster(),
   472  		OSInterface:         kubecontainer.RealOS{},
   473  		VolumePlugins:       plugins,
   474  		DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
   475  		TLSOptions:          tlsOptions}, nil
   476  }
   477  
   478  // Run runs the specified KubeletServer with the given Dependencies. This should never exit.
   479  // The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
   480  // Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
   481  // not be generated.
   482  func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
   483  	// To help debugging, immediately log version
   484  	klog.InfoS("Kubelet version", "kubeletVersion", version.Get())
   485  
   486  	klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
   487  
   488  	if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
   489  		return fmt.Errorf("failed OS init: %w", err)
   490  	}
   491  	if err := run(ctx, s, kubeDeps, featureGate); err != nil {
   492  		return fmt.Errorf("failed to run Kubelet: %w", err)
   493  	}
   494  	return nil
   495  }
   496  
   497  func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfiguration) error {
   498  	scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
   499  	if err != nil {
   500  		return err
   501  	}
   502  	versioned := kubeletconfigv1beta1.KubeletConfiguration{}
   503  	if err := scheme.Convert(kc, &versioned, nil); err != nil {
   504  		return err
   505  	}
   506  	cz.Set(versioned)
   507  	return nil
   508  }
   509  
   510  func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
   511  	cz, err := configz.New("kubeletconfig")
   512  	if err != nil {
   513  		klog.ErrorS(err, "Failed to register configz")
   514  		return err
   515  	}
   516  	if err := setConfigz(cz, kc); err != nil {
   517  		klog.ErrorS(err, "Failed to register config")
   518  		return err
   519  	}
   520  	return nil
   521  }
   522  
   523  // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
   524  func makeEventRecorder(ctx context.Context, kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
   525  	if kubeDeps.Recorder != nil {
   526  		return
   527  	}
   528  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
   529  	kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
   530  	eventBroadcaster.StartStructuredLogging(3)
   531  	if kubeDeps.EventClient != nil {
   532  		klog.V(4).InfoS("Sending events to api server")
   533  		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
   534  	} else {
   535  		klog.InfoS("No api server defined - no events will be sent to API server")
   536  	}
   537  }
   538  
   539  func getReservedCPUs(machineInfo *cadvisorapi.MachineInfo, cpus string) (cpuset.CPUSet, error) {
   540  	emptyCPUSet := cpuset.New()
   541  
   542  	if cpus == "" {
   543  		return emptyCPUSet, nil
   544  	}
   545  
   546  	topo, err := topology.Discover(machineInfo)
   547  	if err != nil {
   548  		return emptyCPUSet, fmt.Errorf("unable to discover CPU topology info: %s", err)
   549  	}
   550  	reservedCPUSet, err := cpuset.Parse(cpus)
   551  	if err != nil {
   552  		return emptyCPUSet, fmt.Errorf("unable to parse reserved-cpus list: %s", err)
   553  	}
   554  	allCPUSet := topo.CPUDetails.CPUs()
   555  	if !reservedCPUSet.IsSubsetOf(allCPUSet) {
   556  		return emptyCPUSet, fmt.Errorf("reserved-cpus: %s is not a subset of online-cpus: %s", cpus, allCPUSet.String())
   557  	}
   558  	return reservedCPUSet, nil
   559  }
   560  
   561  func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
   562  	// Set global feature gates based on the value on the initial KubeletServer
   563  	err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
   564  	if err != nil {
   565  		return err
   566  	}
   567  	// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
   568  	if err := options.ValidateKubeletServer(s); err != nil {
   569  		return err
   570  	}
   571  
   572  	// Warn if MemoryQoS enabled with cgroups v1
   573  	if utilfeature.DefaultFeatureGate.Enabled(features.MemoryQoS) &&
   574  		!isCgroup2UnifiedMode() {
   575  		klog.InfoS("Warning: MemoryQoS feature only works with cgroups v2 on Linux, but enabled with cgroups v1")
   576  	}
   577  	// Obtain Kubelet Lock File
   578  	if s.ExitOnLockContention && s.LockFilePath == "" {
   579  		return errors.New("cannot exit on lock file contention: no lock file specified")
   580  	}
   581  	done := make(chan struct{})
   582  	if s.LockFilePath != "" {
   583  		klog.InfoS("Acquiring file lock", "path", s.LockFilePath)
   584  		if err := flock.Acquire(s.LockFilePath); err != nil {
   585  			return fmt.Errorf("unable to acquire file lock on %q: %w", s.LockFilePath, err)
   586  		}
   587  		if s.ExitOnLockContention {
   588  			klog.InfoS("Watching for inotify events", "path", s.LockFilePath)
   589  			if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
   590  				return err
   591  			}
   592  		}
   593  	}
   594  
   595  	// Register current configuration with /configz endpoint
   596  	err = initConfigz(&s.KubeletConfiguration)
   597  	if err != nil {
   598  		klog.ErrorS(err, "Failed to register kubelet configuration with configz")
   599  	}
   600  
   601  	if len(s.ShowHiddenMetricsForVersion) > 0 {
   602  		metrics.SetShowHidden()
   603  	}
   604  
   605  	// About to get clients and such, detect standaloneMode
   606  	standaloneMode := true
   607  	if len(s.KubeConfig) > 0 {
   608  		standaloneMode = false
   609  	}
   610  
   611  	if kubeDeps == nil {
   612  		kubeDeps, err = UnsecuredDependencies(s, featureGate)
   613  		if err != nil {
   614  			return err
   615  		}
   616  	}
   617  
   618  	if kubeDeps.Cloud == nil {
   619  		if !cloudprovider.IsExternal(s.CloudProvider) {
   620  			cloudprovider.DeprecationWarningForProvider(s.CloudProvider)
   621  			cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
   622  			if err != nil {
   623  				return err
   624  			}
   625  			if cloud != nil {
   626  				klog.V(2).InfoS("Successfully initialized cloud provider", "cloudProvider", s.CloudProvider, "cloudConfigFile", s.CloudConfigFile)
   627  			}
   628  			kubeDeps.Cloud = cloud
   629  		}
   630  	}
   631  
   632  	hostName, err := nodeutil.GetHostname(s.HostnameOverride)
   633  	if err != nil {
   634  		return err
   635  	}
   636  	nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
   637  	if err != nil {
   638  		return err
   639  	}
   640  
   641  	// if in standalone mode, indicate as much by setting all clients to nil
   642  	switch {
   643  	case standaloneMode:
   644  		kubeDeps.KubeClient = nil
   645  		kubeDeps.EventClient = nil
   646  		kubeDeps.HeartbeatClient = nil
   647  		klog.InfoS("Standalone mode, no API client")
   648  
   649  	case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
   650  		clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, kubeDeps.TracerProvider, nodeName)
   651  		if err != nil {
   652  			return err
   653  		}
   654  		if onHeartbeatFailure == nil {
   655  			return errors.New("onHeartbeatFailure must be a valid function other than nil")
   656  		}
   657  		kubeDeps.OnHeartbeatFailure = onHeartbeatFailure
   658  
   659  		kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
   660  		if err != nil {
   661  			return fmt.Errorf("failed to initialize kubelet client: %w", err)
   662  		}
   663  
   664  		// make a separate client for events
   665  		eventClientConfig := *clientConfig
   666  		eventClientConfig.QPS = float32(s.EventRecordQPS)
   667  		eventClientConfig.Burst = int(s.EventBurst)
   668  		kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
   669  		if err != nil {
   670  			return fmt.Errorf("failed to initialize kubelet event client: %w", err)
   671  		}
   672  
   673  		// make a separate client for heartbeat with throttling disabled and a timeout attached
   674  		heartbeatClientConfig := *clientConfig
   675  		heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
   676  		// The timeout is the minimum of the lease duration and status update frequency
   677  		leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
   678  		if heartbeatClientConfig.Timeout > leaseTimeout {
   679  			heartbeatClientConfig.Timeout = leaseTimeout
   680  		}
   681  
   682  		heartbeatClientConfig.QPS = float32(-1)
   683  		kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
   684  		if err != nil {
   685  			return fmt.Errorf("failed to initialize kubelet heartbeat client: %w", err)
   686  		}
   687  	}
   688  
   689  	if kubeDeps.Auth == nil {
   690  		auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
   691  		if err != nil {
   692  			return err
   693  		}
   694  		kubeDeps.Auth = auth
   695  		runAuthenticatorCAReload(ctx.Done())
   696  	}
   697  
   698  	if err := kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps); err != nil {
   699  		return err
   700  	}
   701  
   702  	// Get cgroup driver setting from CRI
   703  	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletCgroupDriverFromCRI) {
   704  		if err := getCgroupDriverFromCRI(ctx, s, kubeDeps); err != nil {
   705  			return err
   706  		}
   707  	}
   708  
   709  	var cgroupRoots []string
   710  	nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
   711  	cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
   712  	kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
   713  	if err != nil {
   714  		klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)
   715  	} else if kubeletCgroup != "" {
   716  		cgroupRoots = append(cgroupRoots, kubeletCgroup)
   717  	}
   718  
   719  	if s.RuntimeCgroups != "" {
   720  		// RuntimeCgroups is optional, so ignore if it isn't specified
   721  		cgroupRoots = append(cgroupRoots, s.RuntimeCgroups)
   722  	}
   723  
   724  	if s.SystemCgroups != "" {
   725  		// SystemCgroups is optional, so ignore if it isn't specified
   726  		cgroupRoots = append(cgroupRoots, s.SystemCgroups)
   727  	}
   728  
   729  	if kubeDeps.CAdvisorInterface == nil {
   730  		imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntimeEndpoint)
   731  		kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntimeEndpoint), s.LocalStorageCapacityIsolation)
   732  		if err != nil {
   733  			return err
   734  		}
   735  	}
   736  
   737  	// Setup event recorder if required.
   738  	makeEventRecorder(ctx, kubeDeps, nodeName)
   739  
   740  	if kubeDeps.ContainerManager == nil {
   741  		if s.CgroupsPerQOS && s.CgroupRoot == "" {
   742  			klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
   743  			s.CgroupRoot = "/"
   744  		}
   745  
   746  		machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
   747  		if err != nil {
   748  			return err
   749  		}
   750  		reservedSystemCPUs, err := getReservedCPUs(machineInfo, s.ReservedSystemCPUs)
   751  		if err != nil {
   752  			return err
   753  		}
   754  		if reservedSystemCPUs.Size() > 0 {
   755  			// at cmd option validation phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
   756  			klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReserved", s.KubeReserved, "systemReserved", s.SystemReserved)
   757  			if s.KubeReserved != nil {
   758  				delete(s.KubeReserved, "cpu")
   759  			}
   760  			if s.SystemReserved == nil {
   761  				s.SystemReserved = make(map[string]string)
   762  			}
   763  			s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
   764  			klog.InfoS("After cpu setting is overwritten", "kubeReserved", s.KubeReserved, "systemReserved", s.SystemReserved)
   765  		}
   766  
   767  		kubeReserved, err := parseResourceList(s.KubeReserved)
   768  		if err != nil {
   769  			return fmt.Errorf("--kube-reserved value failed to parse: %w", err)
   770  		}
   771  		systemReserved, err := parseResourceList(s.SystemReserved)
   772  		if err != nil {
   773  			return fmt.Errorf("--system-reserved value failed to parse: %w", err)
   774  		}
   775  		var hardEvictionThresholds []evictionapi.Threshold
   776  		// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
   777  		if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
   778  			hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
   779  			if err != nil {
   780  				return err
   781  			}
   782  		}
   783  		experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
   784  		if err != nil {
   785  			return fmt.Errorf("--qos-reserved value failed to parse: %w", err)
   786  		}
   787  
   788  		var cpuManagerPolicyOptions map[string]string
   789  		if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
   790  			cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
   791  		} else if s.CPUManagerPolicyOptions != nil {
   792  			return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
   793  				s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
   794  		}
   795  
   796  		var topologyManagerPolicyOptions map[string]string
   797  		if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManagerPolicyOptions) {
   798  			topologyManagerPolicyOptions = s.TopologyManagerPolicyOptions
   799  		} else if s.TopologyManagerPolicyOptions != nil {
   800  			return fmt.Errorf("topology manager policy options %v require feature gates %q enabled",
   801  				s.TopologyManagerPolicyOptions, features.TopologyManagerPolicyOptions)
   802  		}
   803  		if utilfeature.DefaultFeatureGate.Enabled(features.NodeSwap) {
   804  			if !isCgroup2UnifiedMode() && s.MemorySwap.SwapBehavior == kubelettypes.LimitedSwap {
   805  				// This feature is not supported for cgroupv1 so we are failing early.
   806  				return fmt.Errorf("swap feature is enabled and LimitedSwap but it is only supported with cgroupv2")
   807  			}
   808  			if !s.FailSwapOn && s.MemorySwap.SwapBehavior == "" {
   809  				// This is just a log because we are using a default of NoSwap.
   810  				klog.InfoS("NoSwap is set due to memorySwapBehavior not specified", "memorySwapBehavior", s.MemorySwap.SwapBehavior, "FailSwapOn", s.FailSwapOn)
   811  			}
   812  		}
   813  
   814  		kubeDeps.ContainerManager, err = cm.NewContainerManager(
   815  			kubeDeps.Mounter,
   816  			kubeDeps.CAdvisorInterface,
   817  			cm.NodeConfig{
   818  				NodeName:              nodeName,
   819  				RuntimeCgroupsName:    s.RuntimeCgroups,
   820  				SystemCgroupsName:     s.SystemCgroups,
   821  				KubeletCgroupsName:    s.KubeletCgroups,
   822  				KubeletOOMScoreAdj:    s.OOMScoreAdj,
   823  				CgroupsPerQOS:         s.CgroupsPerQOS,
   824  				CgroupRoot:            s.CgroupRoot,
   825  				CgroupDriver:          s.CgroupDriver,
   826  				KubeletRootDir:        s.RootDirectory,
   827  				ProtectKernelDefaults: s.ProtectKernelDefaults,
   828  				NodeAllocatableConfig: cm.NodeAllocatableConfig{
   829  					KubeReservedCgroupName:   s.KubeReservedCgroup,
   830  					SystemReservedCgroupName: s.SystemReservedCgroup,
   831  					EnforceNodeAllocatable:   sets.New(s.EnforceNodeAllocatable...),
   832  					KubeReserved:             kubeReserved,
   833  					SystemReserved:           systemReserved,
   834  					ReservedSystemCPUs:       reservedSystemCPUs,
   835  					HardEvictionThresholds:   hardEvictionThresholds,
   836  				},
   837  				QOSReserved:                             *experimentalQOSReserved,
   838  				CPUManagerPolicy:                        s.CPUManagerPolicy,
   839  				CPUManagerPolicyOptions:                 cpuManagerPolicyOptions,
   840  				CPUManagerReconcilePeriod:               s.CPUManagerReconcilePeriod.Duration,
   841  				ExperimentalMemoryManagerPolicy:         s.MemoryManagerPolicy,
   842  				ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
   843  				PodPidsLimit:                            s.PodPidsLimit,
   844  				EnforceCPULimits:                        s.CPUCFSQuota,
   845  				CPUCFSQuotaPeriod:                       s.CPUCFSQuotaPeriod.Duration,
   846  				TopologyManagerPolicy:                   s.TopologyManagerPolicy,
   847  				TopologyManagerScope:                    s.TopologyManagerScope,
   848  				TopologyManagerPolicyOptions:            topologyManagerPolicyOptions,
   849  			},
   850  			s.FailSwapOn,
   851  			kubeDeps.Recorder,
   852  			kubeDeps.KubeClient,
   853  		)
   854  
   855  		if err != nil {
   856  			return err
   857  		}
   858  	}
   859  
   860  	if kubeDeps.PodStartupLatencyTracker == nil {
   861  		kubeDeps.PodStartupLatencyTracker = kubeletutil.NewPodStartupLatencyTracker()
   862  	}
   863  
   864  	if kubeDeps.NodeStartupLatencyTracker == nil {
   865  		kubeDeps.NodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker()
   866  	}
   867  
   868  	// TODO(vmarmol): Do this through container config.
   869  	oomAdjuster := kubeDeps.OOMAdjuster
   870  	if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
   871  		klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
   872  	}
   873  
   874  	if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
   875  		return err
   876  	}
   877  
   878  	if s.HealthzPort > 0 {
   879  		mux := http.NewServeMux()
   880  		healthz.InstallHandler(mux)
   881  		go wait.Until(func() {
   882  			err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
   883  			if err != nil {
   884  				klog.ErrorS(err, "Failed to start healthz server")
   885  			}
   886  		}, 5*time.Second, wait.NeverStop)
   887  	}
   888  
   889  	if s.RunOnce {
   890  		return nil
   891  	}
   892  
   893  	// If systemd is used, notify it that we have started
   894  	go daemon.SdNotify(false, "READY=1")
   895  
   896  	select {
   897  	case <-done:
   898  		break
   899  	case <-ctx.Done():
   900  		break
   901  	}
   902  
   903  	return nil
   904  }
   905  
   906  // buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
   907  // bootstrapping is enabled or client certificate rotation is enabled.
   908  func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, tp oteltrace.TracerProvider, nodeName types.NodeName) (*restclient.Config, func(), error) {
   909  	if s.RotateCertificates {
   910  		// Rules for client rotation and the handling of kube config files:
   911  		//
   912  		// 1. If the client provides only a kubeconfig file, we must use that as the initial client
   913  		//    kubeadm needs the initial data in the kubeconfig to be placed into the cert store
   914  		// 2. If the client provides only an initial bootstrap kubeconfig file, we must create a
   915  		//    kubeconfig file at the target location that points to the cert store, but until
   916  		//    the file is present the client config will have no certs
   917  		// 3. If the client provides both and the kubeconfig is valid, we must ignore the bootstrap
   918  		//    kubeconfig.
   919  		// 4. If the client provides both and the kubeconfig is expired or otherwise invalid, we must
   920  		//    replace the kubeconfig with a new file that points to the cert dir
   921  		//
   922  		// The desired configuration for bootstrapping is to use a bootstrap kubeconfig and to have
   923  		// the kubeconfig file be managed by this process. For backwards compatibility with kubeadm,
   924  		// which provides a high powered kubeconfig on the master with cert/key data, we must
   925  		// bootstrap the cert manager with the contents of the initial client config.
   926  
   927  		klog.InfoS("Client rotation is on, will bootstrap in background")
   928  		certConfig, clientConfig, err := bootstrap.LoadClientConfig(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory)
   929  		if err != nil {
   930  			return nil, nil, err
   931  		}
   932  
   933  		// use the correct content type for cert rotation, but don't set QPS
   934  		setContentTypeForClient(certConfig, s.ContentType)
   935  
   936  		kubeClientConfigOverrides(s, clientConfig)
   937  
   938  		clientCertificateManager, err := buildClientCertificateManager(certConfig, clientConfig, s.CertDirectory, nodeName)
   939  		if err != nil {
   940  			return nil, nil, err
   941  		}
   942  
   943  		legacyregistry.RawMustRegister(metrics.NewGaugeFunc(
   944  			&metrics.GaugeOpts{
   945  				Subsystem: kubeletmetrics.KubeletSubsystem,
   946  				Name:      "certificate_manager_client_ttl_seconds",
   947  				Help: "Gauge of the TTL (time-to-live) of the Kubelet's client certificate. " +
   948  					"The value is in seconds until certificate expiry (negative if already expired). " +
   949  					"If client certificate is invalid or unused, the value will be +INF.",
   950  				StabilityLevel: metrics.ALPHA,
   951  			},
   952  			func() float64 {
   953  				if c := clientCertificateManager.Current(); c != nil && c.Leaf != nil {
   954  					return math.Trunc(time.Until(c.Leaf.NotAfter).Seconds())
   955  				}
   956  				return math.Inf(1)
   957  			},
   958  		))
   959  
   960  		// the rotating transport will use the cert from the cert manager instead of these files
   961  		transportConfig := restclient.AnonymousClientConfig(clientConfig)
   962  
   963  		// we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
   964  		// to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
   965  		// or the bootstrapping credentials to potentially lay down new initial config.
   966  		closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, transportConfig, clientCertificateManager, 5*time.Minute)
   967  		if err != nil {
   968  			return nil, nil, err
   969  		}
   970  		var onHeartbeatFailure func()
   971  		// Kubelet needs to be able to recover from stale http connections.
   972  		// HTTP2 has a mechanism to detect broken connections by sending periodical pings.
   973  		// HTTP1 only can have one persistent connection, and it will close all Idle connections
   974  		// once the Kubelet heartbeat fails. However, since there are many edge cases that we can't
   975  		// control, users can still opt-in to the previous behavior for closing the connections by
   976  		// setting the environment variable DISABLE_HTTP2.
   977  		if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 {
   978  			klog.InfoS("HTTP2 has been explicitly disabled, Kubelet will forcefully close active connections on heartbeat failures")
   979  			onHeartbeatFailure = closeAllConns
   980  		} else {
   981  			onHeartbeatFailure = func() { utilnet.CloseIdleConnectionsFor(transportConfig.Transport) }
   982  		}
   983  
   984  		klog.V(2).InfoS("Starting client certificate rotation")
   985  		clientCertificateManager.Start()
   986  
   987  		return transportConfig, onHeartbeatFailure, nil
   988  	}
   989  
   990  	if len(s.BootstrapKubeconfig) > 0 {
   991  		if err := bootstrap.LoadClientCert(ctx, s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
   992  			return nil, nil, err
   993  		}
   994  	}
   995  
   996  	clientConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
   997  		&clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig},
   998  		&clientcmd.ConfigOverrides{},
   999  	).ClientConfig()
  1000  	if err != nil {
  1001  		return nil, nil, fmt.Errorf("invalid kubeconfig: %w", err)
  1002  	}
  1003  
  1004  	kubeClientConfigOverrides(s, clientConfig)
  1005  	// Kubelet needs to be able to recover from stale http connections.
  1006  	// HTTP2 has a mechanism to detect broken connections by sending periodical pings.
  1007  	// HTTP1 only can have one persistent connection, and it will close all Idle connections
  1008  	// once the Kubelet heartbeat fails. However, since there are many edge cases that we can't
  1009  	// control, users can still opt-in to the previous behavior for closing the connections by
  1010  	// setting the environment variable DISABLE_HTTP2.
  1011  	var onHeartbeatFailure func()
  1012  	if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 {
  1013  		klog.InfoS("HTTP2 has been explicitly disabled, updating Kubelet client Dialer to forcefully close active connections on heartbeat failures")
  1014  		onHeartbeatFailure, err = updateDialer(clientConfig)
  1015  		if err != nil {
  1016  			return nil, nil, err
  1017  		}
  1018  	} else {
  1019  		onHeartbeatFailure = func() {
  1020  			utilnet.CloseIdleConnectionsFor(clientConfig.Transport)
  1021  		}
  1022  	}
  1023  	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
  1024  		clientConfig.Wrap(tracing.WrapperFor(tp))
  1025  	}
  1026  	return clientConfig, onHeartbeatFailure, nil
  1027  }
  1028  
  1029  // updateDialer instruments a restconfig with a dial. the returned function allows forcefully closing all active connections.
  1030  func updateDialer(clientConfig *restclient.Config) (func(), error) {
  1031  	if clientConfig.Transport != nil || clientConfig.Dial != nil {
  1032  		return nil, fmt.Errorf("there is already a transport or dialer configured")
  1033  	}
  1034  	d := connrotation.NewDialer((&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext)
  1035  	clientConfig.Dial = d.DialContext
  1036  	return d.CloseAll, nil
  1037  }
  1038  
  1039  // buildClientCertificateManager creates a certificate manager that will use certConfig to request a client certificate
  1040  // if no certificate is available, or the most recent clientConfig (which is assumed to point to the cert that the manager will
  1041  // write out).
  1042  func buildClientCertificateManager(certConfig, clientConfig *restclient.Config, certDir string, nodeName types.NodeName) (certificate.Manager, error) {
  1043  	newClientsetFn := func(current *tls.Certificate) (clientset.Interface, error) {
  1044  		// If we have a valid certificate, use that to fetch CSRs. Otherwise use the bootstrap
  1045  		// credentials. In the future it would be desirable to change the behavior of bootstrap
  1046  		// to always fall back to the external bootstrap credentials when such credentials are
  1047  		// provided by a fundamental trust system like cloud VM identity or an HSM module.
  1048  		config := certConfig
  1049  		if current != nil {
  1050  			config = clientConfig
  1051  		}
  1052  		return clientset.NewForConfig(config)
  1053  	}
  1054  
  1055  	return kubeletcertificate.NewKubeletClientCertificateManager(
  1056  		certDir,
  1057  		nodeName,
  1058  
  1059  		// this preserves backwards compatibility with kubeadm which passes
  1060  		// a high powered certificate to the kubelet as --kubeconfig and expects
  1061  		// it to be rotated out immediately
  1062  		clientConfig.CertData,
  1063  		clientConfig.KeyData,
  1064  
  1065  		clientConfig.CertFile,
  1066  		clientConfig.KeyFile,
  1067  		newClientsetFn,
  1068  	)
  1069  }
  1070  
  1071  func kubeClientConfigOverrides(s *options.KubeletServer, clientConfig *restclient.Config) {
  1072  	setContentTypeForClient(clientConfig, s.ContentType)
  1073  	// Override kubeconfig qps/burst settings from flags
  1074  	clientConfig.QPS = float32(s.KubeAPIQPS)
  1075  	clientConfig.Burst = int(s.KubeAPIBurst)
  1076  }
  1077  
  1078  // getNodeName returns the node name according to the cloud provider
  1079  // if cloud provider is specified. Otherwise, returns the hostname of the node.
  1080  func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName, error) {
  1081  	if cloud == nil {
  1082  		return types.NodeName(hostname), nil
  1083  	}
  1084  
  1085  	instances, ok := cloud.Instances()
  1086  	if !ok {
  1087  		return "", fmt.Errorf("failed to get instances from cloud provider")
  1088  	}
  1089  
  1090  	nodeName, err := instances.CurrentNodeName(context.TODO(), hostname)
  1091  	if err != nil {
  1092  		return "", fmt.Errorf("error fetching current node name from cloud provider: %w", err)
  1093  	}
  1094  
  1095  	klog.V(2).InfoS("Cloud provider determined current node", "nodeName", klog.KRef("", string(nodeName)))
  1096  
  1097  	return nodeName, nil
  1098  }
  1099  
  1100  // InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
  1101  // certificate and key file are generated. Returns a configured server.TLSOptions object.
  1102  func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
  1103  	if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
  1104  		kc.TLSCertFile = filepath.Join(kf.CertDirectory, "kubelet.crt")
  1105  		kc.TLSPrivateKeyFile = filepath.Join(kf.CertDirectory, "kubelet.key")
  1106  
  1107  		canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
  1108  		if err != nil {
  1109  			return nil, err
  1110  		}
  1111  		if !canReadCertAndKey {
  1112  			hostName, err := nodeutil.GetHostname(kf.HostnameOverride)
  1113  			if err != nil {
  1114  				return nil, err
  1115  			}
  1116  			cert, key, err := certutil.GenerateSelfSignedCertKey(hostName, nil, nil)
  1117  			if err != nil {
  1118  				return nil, fmt.Errorf("unable to generate self signed cert: %w", err)
  1119  			}
  1120  
  1121  			if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
  1122  				return nil, err
  1123  			}
  1124  
  1125  			if err := keyutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
  1126  				return nil, err
  1127  			}
  1128  
  1129  			klog.V(4).InfoS("Using self-signed cert", "TLSCertFile", kc.TLSCertFile, "TLSPrivateKeyFile", kc.TLSPrivateKeyFile)
  1130  		}
  1131  	}
  1132  
  1133  	tlsCipherSuites, err := cliflag.TLSCipherSuites(kc.TLSCipherSuites)
  1134  	if err != nil {
  1135  		return nil, err
  1136  	}
  1137  
  1138  	if len(tlsCipherSuites) > 0 {
  1139  		insecureCiphers := cliflag.InsecureTLSCiphers()
  1140  		for i := 0; i < len(tlsCipherSuites); i++ {
  1141  			for cipherName, cipherID := range insecureCiphers {
  1142  				if tlsCipherSuites[i] == cipherID {
  1143  					klog.InfoS("Use of insecure cipher detected.", "cipher", cipherName)
  1144  				}
  1145  			}
  1146  		}
  1147  	}
  1148  
  1149  	minTLSVersion, err := cliflag.TLSVersion(kc.TLSMinVersion)
  1150  	if err != nil {
  1151  		return nil, err
  1152  	}
  1153  
  1154  	if minTLSVersion == tls.VersionTLS13 {
  1155  		if len(tlsCipherSuites) != 0 {
  1156  			klog.InfoS("Warning: TLS 1.3 cipher suites are not configurable, ignoring --tls-cipher-suites")
  1157  		}
  1158  	}
  1159  
  1160  	tlsOptions := &server.TLSOptions{
  1161  		Config: &tls.Config{
  1162  			MinVersion:   minTLSVersion,
  1163  			CipherSuites: tlsCipherSuites,
  1164  		},
  1165  		CertFile: kc.TLSCertFile,
  1166  		KeyFile:  kc.TLSPrivateKeyFile,
  1167  	}
  1168  
  1169  	if len(kc.Authentication.X509.ClientCAFile) > 0 {
  1170  		clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
  1171  		if err != nil {
  1172  			return nil, fmt.Errorf("unable to load client CA file %s: %w", kc.Authentication.X509.ClientCAFile, err)
  1173  		}
  1174  		// Specify allowed CAs for client certificates
  1175  		tlsOptions.Config.ClientCAs = clientCAs
  1176  		// Populate PeerCertificates in requests, but don't reject connections without verified certificates
  1177  		tlsOptions.Config.ClientAuth = tls.RequestClientCert
  1178  	}
  1179  
  1180  	return tlsOptions, nil
  1181  }
  1182  
  1183  // setContentTypeForClient sets the appropriate content type into the rest config
  1184  // and handles defaulting AcceptContentTypes based on that input.
  1185  func setContentTypeForClient(cfg *restclient.Config, contentType string) {
  1186  	if len(contentType) == 0 {
  1187  		return
  1188  	}
  1189  	cfg.ContentType = contentType
  1190  	switch contentType {
  1191  	case runtime.ContentTypeProtobuf:
  1192  		cfg.AcceptContentTypes = strings.Join([]string{runtime.ContentTypeProtobuf, runtime.ContentTypeJSON}, ",")
  1193  	default:
  1194  		// otherwise let the rest client perform defaulting
  1195  	}
  1196  }
  1197  
  1198  // RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
  1199  //
  1200  //	1 Integration tests
  1201  //	2 Kubelet binary
  1202  //	3 Standalone 'kubernetes' binary
  1203  //
  1204  // Eventually, #2 will be replaced with instances of #3
  1205  func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
  1206  	hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
  1207  	if err != nil {
  1208  		return err
  1209  	}
  1210  	// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
  1211  	nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
  1212  	if err != nil {
  1213  		return err
  1214  	}
  1215  	hostnameOverridden := len(kubeServer.HostnameOverride) > 0
  1216  	// Setup event recorder if required.
  1217  	makeEventRecorder(context.TODO(), kubeDeps, nodeName)
  1218  
  1219  	nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider)
  1220  	if err != nil {
  1221  		return fmt.Errorf("bad --node-ip %q: %v", kubeServer.NodeIP, err)
  1222  	}
  1223  
  1224  	capabilities.Initialize(capabilities.Capabilities{
  1225  		AllowPrivileged: true,
  1226  	})
  1227  
  1228  	credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
  1229  	klog.V(2).InfoS("Using root directory", "path", kubeServer.RootDirectory)
  1230  
  1231  	if kubeDeps.OSInterface == nil {
  1232  		kubeDeps.OSInterface = kubecontainer.RealOS{}
  1233  	}
  1234  
  1235  	k, err := createAndInitKubelet(kubeServer,
  1236  		kubeDeps,
  1237  		hostname,
  1238  		hostnameOverridden,
  1239  		nodeName,
  1240  		nodeIPs)
  1241  	if err != nil {
  1242  		return fmt.Errorf("failed to create kubelet: %w", err)
  1243  	}
  1244  
  1245  	// NewMainKubelet should have set up a pod source config if one didn't exist
  1246  	// when the builder was run. This is just a precaution.
  1247  	if kubeDeps.PodConfig == nil {
  1248  		return fmt.Errorf("failed to create kubelet, pod source config was nil")
  1249  	}
  1250  	podCfg := kubeDeps.PodConfig
  1251  
  1252  	if err := rlimit.SetNumFiles(uint64(kubeServer.MaxOpenFiles)); err != nil {
  1253  		klog.ErrorS(err, "Failed to set rlimit on max file handles")
  1254  	}
  1255  
  1256  	// process pods and exit.
  1257  	if runOnce {
  1258  		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
  1259  			return fmt.Errorf("runonce failed: %w", err)
  1260  		}
  1261  		klog.InfoS("Started kubelet as runonce")
  1262  	} else {
  1263  		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
  1264  		klog.InfoS("Started kubelet")
  1265  	}
  1266  	return nil
  1267  }
  1268  
  1269  func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
  1270  	// start the kubelet
  1271  	go k.Run(podCfg.Updates())
  1272  
  1273  	// start the kubelet server
  1274  	if enableServer {
  1275  		go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
  1276  	}
  1277  	if kubeCfg.ReadOnlyPort > 0 {
  1278  		go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
  1279  	}
  1280  	go k.ListenAndServePodResources()
  1281  }
  1282  
  1283  func createAndInitKubelet(kubeServer *options.KubeletServer,
  1284  	kubeDeps *kubelet.Dependencies,
  1285  	hostname string,
  1286  	hostnameOverridden bool,
  1287  	nodeName types.NodeName,
  1288  	nodeIPs []net.IP) (k kubelet.Bootstrap, err error) {
  1289  	// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
  1290  	// up into "per source" synchronizations
  1291  
  1292  	k, err = kubelet.NewMainKubelet(&kubeServer.KubeletConfiguration,
  1293  		kubeDeps,
  1294  		&kubeServer.ContainerRuntimeOptions,
  1295  		hostname,
  1296  		hostnameOverridden,
  1297  		nodeName,
  1298  		nodeIPs,
  1299  		kubeServer.ProviderID,
  1300  		kubeServer.CloudProvider,
  1301  		kubeServer.CertDirectory,
  1302  		kubeServer.RootDirectory,
  1303  		kubeServer.PodLogsDir,
  1304  		kubeServer.ImageCredentialProviderConfigFile,
  1305  		kubeServer.ImageCredentialProviderBinDir,
  1306  		kubeServer.RegisterNode,
  1307  		kubeServer.RegisterWithTaints,
  1308  		kubeServer.AllowedUnsafeSysctls,
  1309  		kubeServer.ExperimentalMounterPath,
  1310  		kubeServer.KernelMemcgNotification,
  1311  		kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
  1312  		kubeServer.MinimumGCAge,
  1313  		kubeServer.MaxPerPodContainerCount,
  1314  		kubeServer.MaxContainerCount,
  1315  		kubeServer.RegisterSchedulable,
  1316  		kubeServer.KeepTerminatedPodVolumes,
  1317  		kubeServer.NodeLabels,
  1318  		kubeServer.NodeStatusMaxImages,
  1319  		kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault)
  1320  	if err != nil {
  1321  		return nil, err
  1322  	}
  1323  
  1324  	k.BirthCry()
  1325  
  1326  	k.StartGarbageCollection()
  1327  
  1328  	return k, nil
  1329  }
  1330  
  1331  // parseResourceList parses the given configuration map into an API
  1332  // ResourceList or returns an error.
  1333  func parseResourceList(m map[string]string) (v1.ResourceList, error) {
  1334  	if len(m) == 0 {
  1335  		return nil, nil
  1336  	}
  1337  	rl := make(v1.ResourceList)
  1338  	for k, v := range m {
  1339  		switch v1.ResourceName(k) {
  1340  		// CPU, memory, local storage, and PID resources are supported.
  1341  		case v1.ResourceCPU, v1.ResourceMemory, v1.ResourceEphemeralStorage, pidlimit.PIDs:
  1342  			q, err := resource.ParseQuantity(v)
  1343  			if err != nil {
  1344  				return nil, fmt.Errorf("failed to parse quantity %q for %q resource: %w", v, k, err)
  1345  			}
  1346  			if q.Sign() == -1 {
  1347  				return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
  1348  			}
  1349  			rl[v1.ResourceName(k)] = q
  1350  		default:
  1351  			return nil, fmt.Errorf("cannot reserve %q resource", k)
  1352  		}
  1353  	}
  1354  	return rl, nil
  1355  }
  1356  
  1357  func newTracerProvider(s *options.KubeletServer) (oteltrace.TracerProvider, error) {
  1358  	if s.KubeletConfiguration.Tracing == nil {
  1359  		return oteltrace.NewNoopTracerProvider(), nil
  1360  	}
  1361  	hostname, err := nodeutil.GetHostname(s.HostnameOverride)
  1362  	if err != nil {
  1363  		return nil, fmt.Errorf("could not determine hostname for tracer provider: %w", err)
  1364  	}
  1365  	resourceOpts := []otelsdkresource.Option{
  1366  		otelsdkresource.WithAttributes(
  1367  			semconv.ServiceNameKey.String(componentKubelet),
  1368  			semconv.HostNameKey.String(hostname),
  1369  		),
  1370  	}
  1371  	tp, err := tracing.NewProvider(context.Background(), s.KubeletConfiguration.Tracing, []otlptracegrpc.Option{}, resourceOpts)
  1372  	if err != nil {
  1373  		return nil, fmt.Errorf("could not configure tracer provider: %w", err)
  1374  	}
  1375  	return tp, nil
  1376  }
  1377  
  1378  func getCgroupDriverFromCRI(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies) error {
  1379  	klog.V(4).InfoS("Getting CRI runtime configuration information")
  1380  
  1381  	var (
  1382  		runtimeConfig *runtimeapi.RuntimeConfigResponse
  1383  		err           error
  1384  	)
  1385  	// Retry a couple of times, hoping that any errors are transient.
  1386  	// Fail quickly on known, non transient errors.
  1387  	for i := 0; i < 3; i++ {
  1388  		runtimeConfig, err = kubeDeps.RemoteRuntimeService.RuntimeConfig(ctx)
  1389  		if err != nil {
  1390  			s, ok := status.FromError(err)
  1391  			if !ok || s.Code() != codes.Unimplemented {
  1392  				// We could introduce a backoff delay or jitter, but this is largely catching cases
  1393  				// where the runtime is still starting up and we request too early.
  1394  				// Give it a little more time.
  1395  				time.Sleep(time.Second * 2)
  1396  				continue
  1397  			}
  1398  			// CRI implementation doesn't support RuntimeConfig, fallback
  1399  			klog.InfoS("CRI implementation should be updated to support RuntimeConfig when KubeletCgroupDriverFromCRI feature gate has been enabled. Falling back to using cgroupDriver from kubelet config.")
  1400  			return nil
  1401  		}
  1402  	}
  1403  	if err != nil {
  1404  		return err
  1405  	}
  1406  
  1407  	// Calling GetLinux().GetCgroupDriver() won't segfault, but it will always default to systemd
  1408  	// which is not intended by the fields not being populated
  1409  	linuxConfig := runtimeConfig.GetLinux()
  1410  	if linuxConfig == nil {
  1411  		return nil
  1412  	}
  1413  
  1414  	switch d := linuxConfig.GetCgroupDriver(); d {
  1415  	case runtimeapi.CgroupDriver_SYSTEMD:
  1416  		s.CgroupDriver = "systemd"
  1417  	case runtimeapi.CgroupDriver_CGROUPFS:
  1418  		s.CgroupDriver = "cgroupfs"
  1419  	default:
  1420  		return fmt.Errorf("runtime returned an unknown cgroup driver %d", d)
  1421  	}
  1422  	klog.InfoS("Using cgroup driver setting received from the CRI runtime", "cgroupDriver", s.CgroupDriver)
  1423  	return nil
  1424  }
  1425  

View as plain text