...

Source file src/k8s.io/kubernetes/cmd/kube-proxy/app/server.go

Documentation: k8s.io/kubernetes/cmd/kube-proxy/app

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package app does all of the work necessary to configure and run a
    18  // Kubernetes app process.
    19  package app
    20  
    21  import (
    22  	"context"
    23  	goflag "flag"
    24  	"fmt"
    25  	"net"
    26  	"net/http"
    27  	"os"
    28  	"strings"
    29  	"time"
    30  
    31  	"github.com/fsnotify/fsnotify"
    32  	"github.com/spf13/cobra"
    33  	"github.com/spf13/pflag"
    34  
    35  	v1 "k8s.io/api/core/v1"
    36  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    37  	"k8s.io/apimachinery/pkg/fields"
    38  	"k8s.io/apimachinery/pkg/labels"
    39  	"k8s.io/apimachinery/pkg/runtime"
    40  	"k8s.io/apimachinery/pkg/runtime/serializer"
    41  	"k8s.io/apimachinery/pkg/selection"
    42  	"k8s.io/apimachinery/pkg/types"
    43  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    44  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    45  	"k8s.io/apimachinery/pkg/util/validation/field"
    46  	"k8s.io/apimachinery/pkg/util/wait"
    47  	"k8s.io/apiserver/pkg/server/healthz"
    48  	"k8s.io/apiserver/pkg/server/mux"
    49  	"k8s.io/apiserver/pkg/server/routes"
    50  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    51  	"k8s.io/client-go/informers"
    52  	clientset "k8s.io/client-go/kubernetes"
    53  	"k8s.io/client-go/rest"
    54  	"k8s.io/client-go/tools/clientcmd"
    55  	clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
    56  	"k8s.io/client-go/tools/events"
    57  	cliflag "k8s.io/component-base/cli/flag"
    58  	componentbaseconfig "k8s.io/component-base/config"
    59  	"k8s.io/component-base/configz"
    60  	"k8s.io/component-base/logs"
    61  	logsapi "k8s.io/component-base/logs/api/v1"
    62  	"k8s.io/component-base/metrics"
    63  	metricsfeatures "k8s.io/component-base/metrics/features"
    64  	"k8s.io/component-base/metrics/legacyregistry"
    65  	"k8s.io/component-base/metrics/prometheus/slis"
    66  	"k8s.io/component-base/version"
    67  	"k8s.io/component-base/version/verflag"
    68  	nodeutil "k8s.io/component-helpers/node/util"
    69  	"k8s.io/klog/v2"
    70  	"k8s.io/kube-proxy/config/v1alpha1"
    71  	api "k8s.io/kubernetes/pkg/apis/core"
    72  	"k8s.io/kubernetes/pkg/cluster/ports"
    73  	"k8s.io/kubernetes/pkg/features"
    74  	"k8s.io/kubernetes/pkg/kubelet/qos"
    75  	"k8s.io/kubernetes/pkg/proxy"
    76  	"k8s.io/kubernetes/pkg/proxy/apis"
    77  	kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
    78  	proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
    79  	kubeproxyconfigv1alpha1 "k8s.io/kubernetes/pkg/proxy/apis/config/v1alpha1"
    80  	"k8s.io/kubernetes/pkg/proxy/apis/config/validation"
    81  	"k8s.io/kubernetes/pkg/proxy/config"
    82  	"k8s.io/kubernetes/pkg/proxy/healthcheck"
    83  	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
    84  	"k8s.io/kubernetes/pkg/util/filesystem"
    85  	utilflag "k8s.io/kubernetes/pkg/util/flag"
    86  	utilnode "k8s.io/kubernetes/pkg/util/node"
    87  	"k8s.io/kubernetes/pkg/util/oom"
    88  	netutils "k8s.io/utils/net"
    89  	"k8s.io/utils/ptr"
    90  )
    91  
    92  func init() {
    93  	utilruntime.Must(metricsfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
    94  	utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
    95  }
    96  
    97  // proxyRun defines the interface to run a specified ProxyServer
    98  type proxyRun interface {
    99  	Run() error
   100  }
   101  
   102  // Options contains everything necessary to create and run a proxy server.
   103  type Options struct {
   104  	// ConfigFile is the location of the proxy server's configuration file.
   105  	ConfigFile string
   106  	// WriteConfigTo is the path where the default configuration will be written.
   107  	WriteConfigTo string
   108  	// CleanupAndExit, when true, makes the proxy server clean up iptables and ipvs rules, then exit.
   109  	CleanupAndExit bool
   110  	// InitAndExit, when true, makes the proxy server makes configurations that need privileged access, then exit.
   111  	InitAndExit bool
   112  	// WindowsService should be set to true if kube-proxy is running as a service on Windows.
   113  	// Its corresponding flag only gets registered in Windows builds
   114  	WindowsService bool
   115  	// config is the proxy server's configuration object.
   116  	config *kubeproxyconfig.KubeProxyConfiguration
   117  	// watcher is used to watch on the update change of ConfigFile
   118  	watcher filesystem.FSWatcher
   119  	// proxyServer is the interface to run the proxy server
   120  	proxyServer proxyRun
   121  	// errCh is the channel that errors will be sent
   122  	errCh chan error
   123  
   124  	// The fields below here are placeholders for flags that can't be directly mapped into
   125  	// config.KubeProxyConfiguration.
   126  	//
   127  	// TODO remove these fields once the deprecated flags are removed.
   128  
   129  	// master is used to override the kubeconfig's URL to the apiserver.
   130  	master string
   131  	// healthzPort is the port to be used by the healthz server.
   132  	healthzPort int32
   133  	// metricsPort is the port to be used by the metrics server.
   134  	metricsPort int32
   135  
   136  	// hostnameOverride, if set from the command line flag, takes precedence over the `HostnameOverride` value from the config file
   137  	hostnameOverride string
   138  
   139  	logger klog.Logger
   140  }
   141  
   142  // AddFlags adds flags to fs and binds them to options.
   143  func (o *Options) AddFlags(fs *pflag.FlagSet) {
   144  	o.addOSFlags(fs)
   145  
   146  	fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file.")
   147  	fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the default configuration values to this file and exit.")
   148  
   149  	fs.BoolVar(&o.CleanupAndExit, "cleanup", o.CleanupAndExit, "If true cleanup iptables and ipvs rules and exit.")
   150  
   151  	fs.Var(cliflag.NewMapStringBool(&o.config.FeatureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
   152  		"Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n")+"\n"+
   153  		"This parameter is ignored if a config file is specified by --config.")
   154  
   155  	fs.StringVar(&o.config.ClientConnection.Kubeconfig, "kubeconfig", o.config.ClientConnection.Kubeconfig, "Path to kubeconfig file with authorization information (the master location can be overridden by the master flag).")
   156  	fs.StringVar(&o.master, "master", o.master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
   157  	fs.StringVar(&o.config.ClientConnection.ContentType, "kube-api-content-type", o.config.ClientConnection.ContentType, "Content type of requests sent to apiserver.")
   158  	fs.Int32Var(&o.config.ClientConnection.Burst, "kube-api-burst", o.config.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver")
   159  	fs.Float32Var(&o.config.ClientConnection.QPS, "kube-api-qps", o.config.ClientConnection.QPS, "QPS to use while talking with kubernetes apiserver")
   160  
   161  	fs.StringVar(&o.hostnameOverride, "hostname-override", o.hostnameOverride, "If non-empty, will be used as the name of the Node that kube-proxy is running on. If unset, the node name is assumed to be the same as the node's hostname.")
   162  	fs.Var(&utilflag.IPVar{Val: &o.config.BindAddress}, "bind-address", "Overrides kube-proxy's idea of what its node's primary IP is. Note that the name is a historical artifact, and kube-proxy does not actually bind any sockets to this IP. This parameter is ignored if a config file is specified by --config.")
   163  	fs.Var(&utilflag.IPPortVar{Val: &o.config.HealthzBindAddress}, "healthz-bind-address", "The IP address and port for the health check server to serve on, defaulting to \"0.0.0.0:10256\" (if --bind-address is unset or IPv4), or \"[::]:10256\" (if --bind-address is IPv6). Set empty to disable. This parameter is ignored if a config file is specified by --config.")
   164  	fs.Var(&utilflag.IPPortVar{Val: &o.config.MetricsBindAddress}, "metrics-bind-address", "The IP address and port for the metrics server to serve on, defaulting to \"127.0.0.1:10249\" (if --bind-address is unset or IPv4), or \"[::1]:10249\" (if --bind-address is IPv6). (Set to \"0.0.0.0:10249\" / \"[::]:10249\" to bind on all interfaces.) Set empty to disable. This parameter is ignored if a config file is specified by --config.")
   165  	fs.BoolVar(&o.config.BindAddressHardFail, "bind-address-hard-fail", o.config.BindAddressHardFail, "If true kube-proxy will treat failure to bind to a port as fatal and exit")
   166  	fs.BoolVar(&o.config.EnableProfiling, "profiling", o.config.EnableProfiling, "If true enables profiling via web interface on /debug/pprof handler. This parameter is ignored if a config file is specified by --config.")
   167  	fs.StringVar(&o.config.ShowHiddenMetricsForVersion, "show-hidden-metrics-for-version", o.config.ShowHiddenMetricsForVersion,
   168  		"The previous version for which you want to show hidden metrics. "+
   169  			"Only the previous minor version is meaningful, other values will not be allowed. "+
   170  			"The format is <major>.<minor>, e.g.: '1.16'. "+
   171  			"The purpose of this format is make sure you have the opportunity to notice if the next release hides additional metrics, "+
   172  			"rather than being surprised when they are permanently removed in the release after that. "+
   173  			"This parameter is ignored if a config file is specified by --config.")
   174  	fs.BoolVar(&o.InitAndExit, "init-only", o.InitAndExit, "If true, perform any initialization steps that must be done with full root privileges, and then exit. After doing this, you can run kube-proxy again with only the CAP_NET_ADMIN capability.")
   175  	fs.Var(&o.config.Mode, "proxy-mode", "Which proxy mode to use: on Linux this can be 'iptables' (default) or 'ipvs'. On Windows the only supported value is 'kernelspace'."+
   176  		"This parameter is ignored if a config file is specified by --config.")
   177  
   178  	fs.Int32Var(o.config.IPTables.MasqueradeBit, "iptables-masquerade-bit", ptr.Deref(o.config.IPTables.MasqueradeBit, 14), "If using the iptables or ipvs proxy mode, the bit of the fwmark space to mark packets requiring SNAT with.  Must be within the range [0, 31].")
   179  	fs.BoolVar(&o.config.IPTables.MasqueradeAll, "masquerade-all", o.config.IPTables.MasqueradeAll, "If using the iptables or ipvs proxy mode, SNAT all traffic sent via Service cluster IPs. This may be required with some CNI plugins.")
   180  	fs.BoolVar(o.config.IPTables.LocalhostNodePorts, "iptables-localhost-nodeports", ptr.Deref(o.config.IPTables.LocalhostNodePorts, true), "If false, kube-proxy will disable the legacy behavior of allowing NodePort services to be accessed via localhost. (Applies only to iptables mode and IPv4; localhost NodePorts are never allowed with other proxy modes or with IPv6.)")
   181  	fs.DurationVar(&o.config.IPTables.SyncPeriod.Duration, "iptables-sync-period", o.config.IPTables.SyncPeriod.Duration, "An interval (e.g. '5s', '1m', '2h22m') indicating how frequently various re-synchronizing and cleanup operations are performed. Must be greater than 0.")
   182  	fs.DurationVar(&o.config.IPTables.MinSyncPeriod.Duration, "iptables-min-sync-period", o.config.IPTables.MinSyncPeriod.Duration, "The minimum period between iptables rule resyncs (e.g. '5s', '1m', '2h22m'). A value of 0 means every Service or EndpointSlice change will result in an immediate iptables resync.")
   183  
   184  	fs.DurationVar(&o.config.IPVS.SyncPeriod.Duration, "ipvs-sync-period", o.config.IPVS.SyncPeriod.Duration, "An interval (e.g. '5s', '1m', '2h22m') indicating how frequently various re-synchronizing and cleanup operations are performed. Must be greater than 0.")
   185  	fs.DurationVar(&o.config.IPVS.MinSyncPeriod.Duration, "ipvs-min-sync-period", o.config.IPVS.MinSyncPeriod.Duration, "The minimum period between IPVS rule resyncs (e.g. '5s', '1m', '2h22m'). A value of 0 means every Service or EndpointSlice change will result in an immediate IPVS resync.")
   186  	fs.StringVar(&o.config.IPVS.Scheduler, "ipvs-scheduler", o.config.IPVS.Scheduler, "The ipvs scheduler type when proxy mode is ipvs")
   187  	fs.StringSliceVar(&o.config.IPVS.ExcludeCIDRs, "ipvs-exclude-cidrs", o.config.IPVS.ExcludeCIDRs, "A comma-separated list of CIDRs which the ipvs proxier should not touch when cleaning up IPVS rules.")
   188  	fs.BoolVar(&o.config.IPVS.StrictARP, "ipvs-strict-arp", o.config.IPVS.StrictARP, "Enable strict ARP by setting arp_ignore to 1 and arp_announce to 2")
   189  	fs.DurationVar(&o.config.IPVS.TCPTimeout.Duration, "ipvs-tcp-timeout", o.config.IPVS.TCPTimeout.Duration, "The timeout for idle IPVS TCP connections, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
   190  	fs.DurationVar(&o.config.IPVS.TCPFinTimeout.Duration, "ipvs-tcpfin-timeout", o.config.IPVS.TCPFinTimeout.Duration, "The timeout for IPVS TCP connections after receiving a FIN packet, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
   191  	fs.DurationVar(&o.config.IPVS.UDPTimeout.Duration, "ipvs-udp-timeout", o.config.IPVS.UDPTimeout.Duration, "The timeout for IPVS UDP packets, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
   192  
   193  	fs.Var(&o.config.DetectLocalMode, "detect-local-mode", "Mode to use to detect local traffic. This parameter is ignored if a config file is specified by --config.")
   194  	fs.StringVar(&o.config.DetectLocal.BridgeInterface, "pod-bridge-interface", o.config.DetectLocal.BridgeInterface, "A bridge interface name. When --detect-local-mode is set to BridgeInterface, kube-proxy will consider traffic to be local if it originates from this bridge.")
   195  	fs.StringVar(&o.config.DetectLocal.InterfaceNamePrefix, "pod-interface-name-prefix", o.config.DetectLocal.InterfaceNamePrefix, "An interface name prefix. When --detect-local-mode is set to InterfaceNamePrefix, kube-proxy will consider traffic to be local if it originates from any interface whose name begins with this prefix.")
   196  	fs.StringVar(&o.config.ClusterCIDR, "cluster-cidr", o.config.ClusterCIDR, "The CIDR range of the pods in the cluster. (For dual-stack clusters, this can be a comma-separated dual-stack pair of CIDR ranges.). When --detect-local-mode is set to ClusterCIDR, kube-proxy will consider traffic to be local if its source IP is in this range. (Otherwise it is not used.) "+
   197  		"This parameter is ignored if a config file is specified by --config.")
   198  
   199  	fs.StringSliceVar(&o.config.NodePortAddresses, "nodeport-addresses", o.config.NodePortAddresses,
   200  		"A list of CIDR ranges that contain valid node IPs. If set, connections to NodePort services will only be accepted on node IPs in one of the indicated ranges. If unset, NodePort connections will be accepted on all local IPs. This parameter is ignored if a config file is specified by --config.")
   201  
   202  	fs.Int32Var(o.config.OOMScoreAdj, "oom-score-adj", ptr.Deref(o.config.OOMScoreAdj, int32(qos.KubeProxyOOMScoreAdj)), "The oom-score-adj value for kube-proxy process. Values must be within the range [-1000, 1000]. This parameter is ignored if a config file is specified by --config.")
   203  	fs.Int32Var(o.config.Conntrack.MaxPerCore, "conntrack-max-per-core", *o.config.Conntrack.MaxPerCore,
   204  		"Maximum number of NAT connections to track per CPU core (0 to leave the limit as-is and ignore conntrack-min).")
   205  	fs.Int32Var(o.config.Conntrack.Min, "conntrack-min", *o.config.Conntrack.Min,
   206  		"Minimum number of conntrack entries to allocate, regardless of conntrack-max-per-core (set conntrack-max-per-core=0 to leave the limit as-is).")
   207  
   208  	fs.DurationVar(&o.config.Conntrack.TCPEstablishedTimeout.Duration, "conntrack-tcp-timeout-established", o.config.Conntrack.TCPEstablishedTimeout.Duration, "Idle timeout for established TCP connections (0 to leave as-is)")
   209  	fs.DurationVar(
   210  		&o.config.Conntrack.TCPCloseWaitTimeout.Duration, "conntrack-tcp-timeout-close-wait",
   211  		o.config.Conntrack.TCPCloseWaitTimeout.Duration,
   212  		"NAT timeout for TCP connections in the CLOSE_WAIT state")
   213  	fs.BoolVar(&o.config.Conntrack.TCPBeLiberal, "conntrack-tcp-be-liberal", o.config.Conntrack.TCPBeLiberal, "Enable liberal mode for tracking TCP packets by setting nf_conntrack_tcp_be_liberal to 1")
   214  	fs.DurationVar(&o.config.Conntrack.UDPTimeout.Duration, "conntrack-udp-timeout", o.config.Conntrack.UDPTimeout.Duration, "Idle timeout for UNREPLIED UDP connections (0 to leave as-is)")
   215  	fs.DurationVar(&o.config.Conntrack.UDPStreamTimeout.Duration, "conntrack-udp-timeout-stream", o.config.Conntrack.UDPStreamTimeout.Duration, "Idle timeout for ASSURED UDP connections (0 to leave as-is)")
   216  
   217  	fs.DurationVar(&o.config.ConfigSyncPeriod.Duration, "config-sync-period", o.config.ConfigSyncPeriod.Duration, "How often configuration from the apiserver is refreshed.  Must be greater than 0.")
   218  
   219  	fs.Int32Var(&o.healthzPort, "healthz-port", o.healthzPort, "The port to bind the health check server. Use 0 to disable.")
   220  	_ = fs.MarkDeprecated("healthz-port", "This flag is deprecated and will be removed in a future release. Please use --healthz-bind-address instead.")
   221  	fs.Int32Var(&o.metricsPort, "metrics-port", o.metricsPort, "The port to bind the metrics server. Use 0 to disable.")
   222  	_ = fs.MarkDeprecated("metrics-port", "This flag is deprecated and will be removed in a future release. Please use --metrics-bind-address instead.")
   223  	fs.Var(utilflag.PortRangeVar{Val: &o.config.PortRange}, "proxy-port-range", "This was previously used to configure the userspace proxy, but is now unused.")
   224  	_ = fs.MarkDeprecated("proxy-port-range", "This flag has no effect and will be removed in a future release.")
   225  
   226  	logsapi.AddFlags(&o.config.Logging, fs)
   227  }
   228  
   229  // newKubeProxyConfiguration returns a KubeProxyConfiguration with default values
   230  func newKubeProxyConfiguration() *kubeproxyconfig.KubeProxyConfiguration {
   231  	versionedConfig := &v1alpha1.KubeProxyConfiguration{}
   232  	proxyconfigscheme.Scheme.Default(versionedConfig)
   233  	internalConfig, err := proxyconfigscheme.Scheme.ConvertToVersion(versionedConfig, kubeproxyconfig.SchemeGroupVersion)
   234  	if err != nil {
   235  		panic(fmt.Sprintf("Unable to create default config: %v", err))
   236  	}
   237  
   238  	return internalConfig.(*kubeproxyconfig.KubeProxyConfiguration)
   239  }
   240  
   241  // NewOptions returns initialized Options
   242  func NewOptions() *Options {
   243  	return &Options{
   244  		config:      newKubeProxyConfiguration(),
   245  		healthzPort: ports.ProxyHealthzPort,
   246  		metricsPort: ports.ProxyStatusPort,
   247  		errCh:       make(chan error),
   248  		logger:      klog.FromContext(context.Background()),
   249  	}
   250  }
   251  
   252  // Complete completes all the required options.
   253  func (o *Options) Complete(fs *pflag.FlagSet) error {
   254  	if len(o.ConfigFile) == 0 && len(o.WriteConfigTo) == 0 {
   255  		o.config.HealthzBindAddress = addressFromDeprecatedFlags(o.config.HealthzBindAddress, o.healthzPort)
   256  		o.config.MetricsBindAddress = addressFromDeprecatedFlags(o.config.MetricsBindAddress, o.metricsPort)
   257  	}
   258  
   259  	// Load the config file here in Complete, so that Validate validates the fully-resolved config.
   260  	if len(o.ConfigFile) > 0 {
   261  		c, err := o.loadConfigFromFile(o.ConfigFile)
   262  		if err != nil {
   263  			return err
   264  		}
   265  
   266  		// Before we overwrite the config which holds the parsed
   267  		// command line parameters, we need to copy all modified
   268  		// logging settings over to the loaded config (i.e.  logging
   269  		// command line flags have priority). Otherwise `--config
   270  		// ... -v=5` doesn't work (config resets verbosity even
   271  		// when it contains no logging settings).
   272  		copyLogsFromFlags(fs, &c.Logging)
   273  		o.config = c
   274  
   275  		if err := o.initWatcher(); err != nil {
   276  			return err
   277  		}
   278  	}
   279  
   280  	o.platformApplyDefaults(o.config)
   281  
   282  	if err := o.processHostnameOverrideFlag(); err != nil {
   283  		return err
   284  	}
   285  
   286  	return utilfeature.DefaultMutableFeatureGate.SetFromMap(o.config.FeatureGates)
   287  }
   288  
   289  // copyLogsFromFlags applies the logging flags from the given flag set to the given
   290  // configuration. Fields for which the corresponding flag was not used are left
   291  // unmodified. For fields that have multiple values (like vmodule), the values from
   292  // the flags get joined so that the command line flags have priority.
   293  //
   294  // TODO (pohly): move this to logsapi
   295  func copyLogsFromFlags(from *pflag.FlagSet, to *logsapi.LoggingConfiguration) error {
   296  	var cloneFS pflag.FlagSet
   297  	logsapi.AddFlags(to, &cloneFS)
   298  	vmodule := to.VModule
   299  	to.VModule = nil
   300  	var err error
   301  	cloneFS.VisitAll(func(f *pflag.Flag) {
   302  		if err != nil {
   303  			return
   304  		}
   305  		fsFlag := from.Lookup(f.Name)
   306  		if fsFlag == nil {
   307  			err = fmt.Errorf("logging flag %s not found in flag set", f.Name)
   308  			return
   309  		}
   310  		if !fsFlag.Changed {
   311  			return
   312  		}
   313  		if setErr := f.Value.Set(fsFlag.Value.String()); setErr != nil {
   314  			err = fmt.Errorf("copying flag %s value: %v", f.Name, setErr)
   315  			return
   316  		}
   317  	})
   318  	to.VModule = append(to.VModule, vmodule...)
   319  	return err
   320  }
   321  
   322  // Creates a new filesystem watcher and adds watches for the config file.
   323  func (o *Options) initWatcher() error {
   324  	fswatcher := filesystem.NewFsnotifyWatcher()
   325  	err := fswatcher.Init(o.eventHandler, o.errorHandler)
   326  	if err != nil {
   327  		return err
   328  	}
   329  	err = fswatcher.AddWatch(o.ConfigFile)
   330  	if err != nil {
   331  		return err
   332  	}
   333  	o.watcher = fswatcher
   334  	return nil
   335  }
   336  
   337  func (o *Options) eventHandler(ent fsnotify.Event) {
   338  	if ent.Has(fsnotify.Write) || ent.Has(fsnotify.Rename) {
   339  		// error out when ConfigFile is updated
   340  		o.errCh <- fmt.Errorf("content of the proxy server's configuration file was updated")
   341  		return
   342  	}
   343  	o.errCh <- nil
   344  }
   345  
   346  func (o *Options) errorHandler(err error) {
   347  	o.errCh <- err
   348  }
   349  
   350  // processHostnameOverrideFlag processes hostname-override flag
   351  func (o *Options) processHostnameOverrideFlag() error {
   352  	// Check if hostname-override flag is set and use value since configFile always overrides
   353  	if len(o.hostnameOverride) > 0 {
   354  		hostName := strings.TrimSpace(o.hostnameOverride)
   355  		if len(hostName) == 0 {
   356  			return fmt.Errorf("empty hostname-override is invalid")
   357  		}
   358  		o.config.HostnameOverride = strings.ToLower(hostName)
   359  	}
   360  
   361  	return nil
   362  }
   363  
   364  // Validate validates all the required options.
   365  func (o *Options) Validate() error {
   366  	if errs := validation.Validate(o.config); len(errs) != 0 {
   367  		return errs.ToAggregate()
   368  	}
   369  
   370  	return nil
   371  }
   372  
   373  // Run runs the specified ProxyServer.
   374  func (o *Options) Run() error {
   375  	defer close(o.errCh)
   376  	if len(o.WriteConfigTo) > 0 {
   377  		return o.writeConfigFile()
   378  	}
   379  
   380  	err := platformCleanup(o.config.Mode, o.CleanupAndExit)
   381  	if o.CleanupAndExit {
   382  		return err
   383  	}
   384  	// We ignore err otherwise; the cleanup is best-effort, and the backends will have
   385  	// logged messages if they failed in interesting ways.
   386  
   387  	proxyServer, err := newProxyServer(o.logger, o.config, o.master, o.InitAndExit)
   388  	if err != nil {
   389  		return err
   390  	}
   391  	if o.InitAndExit {
   392  		return nil
   393  	}
   394  
   395  	o.proxyServer = proxyServer
   396  	return o.runLoop()
   397  }
   398  
   399  // runLoop will watch on the update change of the proxy server's configuration file.
   400  // Return an error when updated
   401  func (o *Options) runLoop() error {
   402  	if o.watcher != nil {
   403  		o.watcher.Run()
   404  	}
   405  
   406  	// run the proxy in goroutine
   407  	go func() {
   408  		err := o.proxyServer.Run()
   409  		o.errCh <- err
   410  	}()
   411  
   412  	for {
   413  		err := <-o.errCh
   414  		if err != nil {
   415  			return err
   416  		}
   417  	}
   418  }
   419  
   420  func (o *Options) writeConfigFile() (err error) {
   421  	const mediaType = runtime.ContentTypeYAML
   422  	info, ok := runtime.SerializerInfoForMediaType(proxyconfigscheme.Codecs.SupportedMediaTypes(), mediaType)
   423  	if !ok {
   424  		return fmt.Errorf("unable to locate encoder -- %q is not a supported media type", mediaType)
   425  	}
   426  
   427  	encoder := proxyconfigscheme.Codecs.EncoderForVersion(info.Serializer, v1alpha1.SchemeGroupVersion)
   428  
   429  	configFile, err := os.Create(o.WriteConfigTo)
   430  	if err != nil {
   431  		return err
   432  	}
   433  
   434  	defer func() {
   435  		ferr := configFile.Close()
   436  		if ferr != nil && err == nil {
   437  			err = ferr
   438  		}
   439  	}()
   440  
   441  	if err = encoder.Encode(o.config, configFile); err != nil {
   442  		return err
   443  	}
   444  
   445  	o.logger.Info("Wrote configuration", "file", o.WriteConfigTo)
   446  
   447  	return nil
   448  }
   449  
   450  // addressFromDeprecatedFlags returns server address from flags
   451  // passed on the command line based on the following rules:
   452  // 1. If port is 0, disable the server (e.g. set address to empty).
   453  // 2. Otherwise, set the port portion of the config accordingly.
   454  func addressFromDeprecatedFlags(addr string, port int32) string {
   455  	if port == 0 {
   456  		return ""
   457  	}
   458  	return proxyutil.AppendPortIfNeeded(addr, port)
   459  }
   460  
   461  // newLenientSchemeAndCodecs returns a scheme that has only v1alpha1 registered into
   462  // it and a CodecFactory with strict decoding disabled.
   463  func newLenientSchemeAndCodecs() (*runtime.Scheme, *serializer.CodecFactory, error) {
   464  	lenientScheme := runtime.NewScheme()
   465  	if err := kubeproxyconfig.AddToScheme(lenientScheme); err != nil {
   466  		return nil, nil, fmt.Errorf("failed to add kube-proxy config API to lenient scheme: %v", err)
   467  	}
   468  	if err := kubeproxyconfigv1alpha1.AddToScheme(lenientScheme); err != nil {
   469  		return nil, nil, fmt.Errorf("failed to add kube-proxy config v1alpha1 API to lenient scheme: %v", err)
   470  	}
   471  	lenientCodecs := serializer.NewCodecFactory(lenientScheme, serializer.DisableStrict)
   472  	return lenientScheme, &lenientCodecs, nil
   473  }
   474  
   475  // loadConfigFromFile loads the contents of file and decodes it as a
   476  // KubeProxyConfiguration object.
   477  func (o *Options) loadConfigFromFile(file string) (*kubeproxyconfig.KubeProxyConfiguration, error) {
   478  	data, err := os.ReadFile(file)
   479  	if err != nil {
   480  		return nil, err
   481  	}
   482  
   483  	return o.loadConfig(data)
   484  }
   485  
   486  // loadConfig decodes a serialized KubeProxyConfiguration to the internal type.
   487  func (o *Options) loadConfig(data []byte) (*kubeproxyconfig.KubeProxyConfiguration, error) {
   488  
   489  	configObj, gvk, err := proxyconfigscheme.Codecs.UniversalDecoder().Decode(data, nil, nil)
   490  	if err != nil {
   491  		// Try strict decoding first. If that fails decode with a lenient
   492  		// decoder, which has only v1alpha1 registered, and log a warning.
   493  		// The lenient path is to be dropped when support for v1alpha1 is dropped.
   494  		if !runtime.IsStrictDecodingError(err) {
   495  			return nil, fmt.Errorf("failed to decode: %w", err)
   496  		}
   497  
   498  		_, lenientCodecs, lenientErr := newLenientSchemeAndCodecs()
   499  		if lenientErr != nil {
   500  			return nil, lenientErr
   501  		}
   502  
   503  		configObj, gvk, lenientErr = lenientCodecs.UniversalDecoder().Decode(data, nil, nil)
   504  		if lenientErr != nil {
   505  			// Lenient decoding failed with the current version, return the
   506  			// original strict error.
   507  			return nil, fmt.Errorf("failed lenient decoding: %v", err)
   508  		}
   509  
   510  		// Continue with the v1alpha1 object that was decoded leniently, but emit a warning.
   511  		o.logger.Info("Using lenient decoding as strict decoding failed", "err", err)
   512  	}
   513  
   514  	proxyConfig, ok := configObj.(*kubeproxyconfig.KubeProxyConfiguration)
   515  	if !ok {
   516  		return nil, fmt.Errorf("got unexpected config type: %v", gvk)
   517  	}
   518  	return proxyConfig, nil
   519  }
   520  
   521  // NewProxyCommand creates a *cobra.Command object with default parameters
   522  func NewProxyCommand() *cobra.Command {
   523  	opts := NewOptions()
   524  
   525  	cmd := &cobra.Command{
   526  		Use: "kube-proxy",
   527  		Long: `The Kubernetes network proxy runs on each node. This
   528  reflects services as defined in the Kubernetes API on each node and can do simple
   529  TCP, UDP, and SCTP stream forwarding or round robin TCP, UDP, and SCTP forwarding across a set of backends.
   530  Service cluster IPs and ports are currently found through Docker-links-compatible
   531  environment variables specifying ports opened by the service proxy. There is an optional
   532  addon that provides cluster DNS for these cluster IPs. The user must create a service
   533  with the apiserver API to configure the proxy.`,
   534  		RunE: func(cmd *cobra.Command, args []string) error {
   535  			verflag.PrintAndExitIfRequested()
   536  
   537  			if err := initForOS(opts.WindowsService); err != nil {
   538  				return fmt.Errorf("failed os init: %w", err)
   539  			}
   540  
   541  			if err := opts.Complete(cmd.Flags()); err != nil {
   542  				return fmt.Errorf("failed complete: %w", err)
   543  			}
   544  
   545  			logs.InitLogs()
   546  			if err := logsapi.ValidateAndApplyAsField(&opts.config.Logging, utilfeature.DefaultFeatureGate, field.NewPath("logging")); err != nil {
   547  				return fmt.Errorf("initialize logging: %v", err)
   548  			}
   549  
   550  			cliflag.PrintFlags(cmd.Flags())
   551  
   552  			if err := opts.Validate(); err != nil {
   553  				return fmt.Errorf("failed validate: %w", err)
   554  			}
   555  			// add feature enablement metrics
   556  			utilfeature.DefaultMutableFeatureGate.AddMetrics()
   557  			if err := opts.Run(); err != nil {
   558  				opts.logger.Error(err, "Error running ProxyServer")
   559  				return err
   560  			}
   561  
   562  			return nil
   563  		},
   564  		Args: func(cmd *cobra.Command, args []string) error {
   565  			for _, arg := range args {
   566  				if len(arg) > 0 {
   567  					return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
   568  				}
   569  			}
   570  			return nil
   571  		},
   572  	}
   573  
   574  	fs := cmd.Flags()
   575  	opts.AddFlags(fs)
   576  	fs.AddGoFlagSet(goflag.CommandLine) // for --boot-id-file and --machine-id-file
   577  
   578  	_ = cmd.MarkFlagFilename("config", "yaml", "yml", "json")
   579  
   580  	return cmd
   581  }
   582  
   583  // ProxyServer represents all the parameters required to start the Kubernetes proxy server. All
   584  // fields are required.
   585  type ProxyServer struct {
   586  	Config *kubeproxyconfig.KubeProxyConfiguration
   587  
   588  	Client          clientset.Interface
   589  	Broadcaster     events.EventBroadcaster
   590  	Recorder        events.EventRecorder
   591  	NodeRef         *v1.ObjectReference
   592  	HealthzServer   *healthcheck.ProxierHealthServer
   593  	Hostname        string
   594  	PrimaryIPFamily v1.IPFamily
   595  	NodeIPs         map[v1.IPFamily]net.IP
   596  
   597  	podCIDRs []string // only used for LocalModeNodeCIDR
   598  
   599  	Proxier proxy.Provider
   600  
   601  	logger klog.Logger
   602  }
   603  
   604  // newProxyServer creates a ProxyServer based on the given config
   605  func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfiguration, master string, initOnly bool) (*ProxyServer, error) {
   606  	s := &ProxyServer{
   607  		Config: config,
   608  		logger: logger,
   609  	}
   610  
   611  	cz, err := configz.New(kubeproxyconfig.GroupName)
   612  	if err != nil {
   613  		return nil, fmt.Errorf("unable to register configz: %s", err)
   614  	}
   615  	cz.Set(config)
   616  
   617  	if len(config.ShowHiddenMetricsForVersion) > 0 {
   618  		metrics.SetShowHidden()
   619  	}
   620  
   621  	s.Hostname, err = nodeutil.GetHostname(config.HostnameOverride)
   622  	if err != nil {
   623  		return nil, err
   624  	}
   625  
   626  	s.Client, err = createClient(logger, config.ClientConnection, master)
   627  	if err != nil {
   628  		return nil, err
   629  	}
   630  
   631  	rawNodeIPs := getNodeIPs(logger, s.Client, s.Hostname)
   632  	s.PrimaryIPFamily, s.NodeIPs = detectNodeIPs(logger, rawNodeIPs, config.BindAddress)
   633  
   634  	s.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: s.Client.EventsV1()})
   635  	s.Recorder = s.Broadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")
   636  
   637  	s.NodeRef = &v1.ObjectReference{
   638  		Kind:      "Node",
   639  		Name:      s.Hostname,
   640  		UID:       types.UID(s.Hostname),
   641  		Namespace: "",
   642  	}
   643  
   644  	if len(config.HealthzBindAddress) > 0 {
   645  		s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration)
   646  	}
   647  
   648  	err = s.platformSetup()
   649  	if err != nil {
   650  		return nil, err
   651  	}
   652  
   653  	ipv4Supported, ipv6Supported, dualStackSupported, err := s.platformCheckSupported()
   654  	if err != nil {
   655  		return nil, err
   656  	} else if (s.PrimaryIPFamily == v1.IPv4Protocol && !ipv4Supported) || (s.PrimaryIPFamily == v1.IPv6Protocol && !ipv6Supported) {
   657  		return nil, fmt.Errorf("no support for primary IP family %q", s.PrimaryIPFamily)
   658  	} else if dualStackSupported {
   659  		logger.Info("kube-proxy running in dual-stack mode", "primary ipFamily", s.PrimaryIPFamily)
   660  	} else {
   661  		logger.Info("kube-proxy running in single-stack mode", "ipFamily", s.PrimaryIPFamily)
   662  	}
   663  
   664  	err, fatal := checkIPConfig(s, dualStackSupported)
   665  	if err != nil {
   666  		if fatal {
   667  			return nil, fmt.Errorf("kube-proxy configuration is incorrect: %v", err)
   668  		}
   669  		logger.Error(err, "Kube-proxy configuration may be incomplete or incorrect")
   670  	}
   671  
   672  	s.Proxier, err = s.createProxier(config, dualStackSupported, initOnly)
   673  	if err != nil {
   674  		return nil, err
   675  	}
   676  
   677  	return s, nil
   678  }
   679  
   680  // checkIPConfig confirms that s has proper configuration for its primary IP family.
   681  func checkIPConfig(s *ProxyServer, dualStackSupported bool) (error, bool) {
   682  	var errors []error
   683  	var badFamily netutils.IPFamily
   684  
   685  	if s.PrimaryIPFamily == v1.IPv4Protocol {
   686  		badFamily = netutils.IPv6
   687  	} else {
   688  		badFamily = netutils.IPv4
   689  	}
   690  
   691  	var clusterType string
   692  	if dualStackSupported {
   693  		clusterType = fmt.Sprintf("%s-primary", s.PrimaryIPFamily)
   694  	} else {
   695  		clusterType = fmt.Sprintf("%s-only", s.PrimaryIPFamily)
   696  	}
   697  
   698  	// Historically, we did not check most of the config options, so we cannot
   699  	// retroactively make IP family mismatches in those options be fatal. When we add
   700  	// new options to check here, we should make problems with those options be fatal.
   701  	fatal := false
   702  
   703  	if s.Config.ClusterCIDR != "" {
   704  		clusterCIDRs := strings.Split(s.Config.ClusterCIDR, ",")
   705  		if badCIDRs(clusterCIDRs, badFamily) {
   706  			errors = append(errors, fmt.Errorf("cluster is %s but clusterCIDRs contains only IPv%s addresses", clusterType, badFamily))
   707  			if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeClusterCIDR && !dualStackSupported {
   708  				// This has always been a fatal error
   709  				fatal = true
   710  			}
   711  		}
   712  	}
   713  
   714  	if badCIDRs(s.Config.NodePortAddresses, badFamily) {
   715  		errors = append(errors, fmt.Errorf("cluster is %s but nodePortAddresses contains only IPv%s addresses", clusterType, badFamily))
   716  	}
   717  
   718  	if badCIDRs(s.podCIDRs, badFamily) {
   719  		errors = append(errors, fmt.Errorf("cluster is %s but node.spec.podCIDRs contains only IPv%s addresses", clusterType, badFamily))
   720  		if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
   721  			// This has always been a fatal error
   722  			fatal = true
   723  		}
   724  	}
   725  
   726  	if netutils.IPFamilyOfString(s.Config.Winkernel.SourceVip) == badFamily {
   727  		errors = append(errors, fmt.Errorf("cluster is %s but winkernel.sourceVip is IPv%s", clusterType, badFamily))
   728  	}
   729  
   730  	// In some cases, wrong-IP-family is only a problem when the secondary IP family
   731  	// isn't present at all.
   732  	if !dualStackSupported {
   733  		if badCIDRs(s.Config.IPVS.ExcludeCIDRs, badFamily) {
   734  			errors = append(errors, fmt.Errorf("cluster is %s but ipvs.excludeCIDRs contains only IPv%s addresses", clusterType, badFamily))
   735  		}
   736  
   737  		if badBindAddress(s.Config.HealthzBindAddress, badFamily) {
   738  			errors = append(errors, fmt.Errorf("cluster is %s but healthzBindAddress is IPv%s", clusterType, badFamily))
   739  		}
   740  		if badBindAddress(s.Config.MetricsBindAddress, badFamily) {
   741  			errors = append(errors, fmt.Errorf("cluster is %s but metricsBindAddress is IPv%s", clusterType, badFamily))
   742  		}
   743  	}
   744  
   745  	return utilerrors.NewAggregate(errors), fatal
   746  }
   747  
   748  // badCIDRs returns true if cidrs is a non-empty list of CIDRs, all of wrongFamily.
   749  func badCIDRs(cidrs []string, wrongFamily netutils.IPFamily) bool {
   750  	if len(cidrs) == 0 {
   751  		return false
   752  	}
   753  	for _, cidr := range cidrs {
   754  		if netutils.IPFamilyOfCIDRString(cidr) != wrongFamily {
   755  			return false
   756  		}
   757  	}
   758  	return true
   759  }
   760  
   761  // badBindAddress returns true if bindAddress is an "IP:port" string where IP is a
   762  // non-zero IP of wrongFamily.
   763  func badBindAddress(bindAddress string, wrongFamily netutils.IPFamily) bool {
   764  	if host, _, _ := net.SplitHostPort(bindAddress); host != "" {
   765  		ip := netutils.ParseIPSloppy(host)
   766  		if ip != nil && netutils.IPFamilyOf(ip) == wrongFamily && !ip.IsUnspecified() {
   767  			return true
   768  		}
   769  	}
   770  	return false
   771  }
   772  
   773  // createClient creates a kube client from the given config and masterOverride.
   774  // TODO remove masterOverride when CLI flags are removed.
   775  func createClient(logger klog.Logger, config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
   776  	var kubeConfig *rest.Config
   777  	var err error
   778  
   779  	if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 {
   780  		logger.Info("Neither kubeconfig file nor master URL was specified, falling back to in-cluster config")
   781  		kubeConfig, err = rest.InClusterConfig()
   782  	} else {
   783  		// This creates a client, first loading any specified kubeconfig
   784  		// file, and then overriding the Master flag, if non-empty.
   785  		kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
   786  			&clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
   787  			&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig()
   788  	}
   789  	if err != nil {
   790  		return nil, err
   791  	}
   792  
   793  	kubeConfig.AcceptContentTypes = config.AcceptContentTypes
   794  	kubeConfig.ContentType = config.ContentType
   795  	kubeConfig.QPS = config.QPS
   796  	kubeConfig.Burst = int(config.Burst)
   797  
   798  	client, err := clientset.NewForConfig(kubeConfig)
   799  	if err != nil {
   800  		return nil, err
   801  	}
   802  
   803  	return client, nil
   804  }
   805  
   806  func serveHealthz(logger klog.Logger, hz *healthcheck.ProxierHealthServer, errCh chan error) {
   807  	if hz == nil {
   808  		return
   809  	}
   810  
   811  	fn := func() {
   812  		err := hz.Run()
   813  		if err != nil {
   814  			logger.Error(err, "Healthz server failed")
   815  			if errCh != nil {
   816  				errCh <- fmt.Errorf("healthz server failed: %v", err)
   817  				// if in hardfail mode, never retry again
   818  				blockCh := make(chan error)
   819  				<-blockCh
   820  			}
   821  		} else {
   822  			logger.Error(nil, "Healthz server returned without error")
   823  		}
   824  	}
   825  	go wait.Until(fn, 5*time.Second, wait.NeverStop)
   826  }
   827  
   828  func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
   829  	if len(bindAddress) == 0 {
   830  		return
   831  	}
   832  
   833  	proxyMux := mux.NewPathRecorderMux("kube-proxy")
   834  	healthz.InstallHandler(proxyMux)
   835  	slis.SLIMetricsWithReset{}.Install(proxyMux)
   836  
   837  	proxyMux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
   838  		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
   839  		w.Header().Set("X-Content-Type-Options", "nosniff")
   840  		fmt.Fprintf(w, "%s", proxyMode)
   841  	})
   842  
   843  	proxyMux.Handle("/metrics", legacyregistry.Handler())
   844  
   845  	if enableProfiling {
   846  		routes.Profiling{}.Install(proxyMux)
   847  		routes.DebugFlags{}.Install(proxyMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
   848  	}
   849  
   850  	configz.InstallHandler(proxyMux)
   851  
   852  	fn := func() {
   853  		err := http.ListenAndServe(bindAddress, proxyMux)
   854  		if err != nil {
   855  			err = fmt.Errorf("starting metrics server failed: %v", err)
   856  			utilruntime.HandleError(err)
   857  			if errCh != nil {
   858  				errCh <- err
   859  				// if in hardfail mode, never retry again
   860  				blockCh := make(chan error)
   861  				<-blockCh
   862  			}
   863  		}
   864  	}
   865  	go wait.Until(fn, 5*time.Second, wait.NeverStop)
   866  }
   867  
   868  // Run runs the specified ProxyServer.  This should never exit (unless CleanupAndExit is set).
   869  // TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors.
   870  func (s *ProxyServer) Run() error {
   871  	// To help debugging, immediately log version
   872  	s.logger.Info("Version info", "version", version.Get())
   873  
   874  	s.logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
   875  
   876  	// TODO(vmarmol): Use container config for this.
   877  	var oomAdjuster *oom.OOMAdjuster
   878  	if s.Config.OOMScoreAdj != nil {
   879  		oomAdjuster = oom.NewOOMAdjuster()
   880  		if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.Config.OOMScoreAdj)); err != nil {
   881  			s.logger.V(2).Info("Failed to apply OOMScore", "err", err)
   882  		}
   883  	}
   884  
   885  	if s.Broadcaster != nil {
   886  		stopCh := make(chan struct{})
   887  		s.Broadcaster.StartRecordingToSink(stopCh)
   888  	}
   889  
   890  	// TODO(thockin): make it possible for healthz and metrics to be on the same port.
   891  
   892  	var healthzErrCh, metricsErrCh chan error
   893  	if s.Config.BindAddressHardFail {
   894  		healthzErrCh = make(chan error)
   895  		metricsErrCh = make(chan error)
   896  	}
   897  
   898  	// Start up a healthz server if requested
   899  	serveHealthz(s.logger, s.HealthzServer, healthzErrCh)
   900  
   901  	// Start up a metrics server if requested
   902  	serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
   903  
   904  	noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
   905  	if err != nil {
   906  		return err
   907  	}
   908  
   909  	noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
   910  	if err != nil {
   911  		return err
   912  	}
   913  
   914  	labelSelector := labels.NewSelector()
   915  	labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
   916  
   917  	// Make informers that filter out objects that want a non-default service proxy.
   918  	informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.Config.ConfigSyncPeriod.Duration,
   919  		informers.WithTweakListOptions(func(options *metav1.ListOptions) {
   920  			options.LabelSelector = labelSelector.String()
   921  		}))
   922  
   923  	// Create configs (i.e. Watches for Services, EndpointSlices and ServiceCIDRs)
   924  	// Note: RegisterHandler() calls need to happen before creation of Sources because sources
   925  	// only notify on changes, and the initial update (on process start) may be lost if no handlers
   926  	// are registered yet.
   927  	serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.Config.ConfigSyncPeriod.Duration)
   928  	serviceConfig.RegisterEventHandler(s.Proxier)
   929  	go serviceConfig.Run(wait.NeverStop)
   930  
   931  	endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.Config.ConfigSyncPeriod.Duration)
   932  	endpointSliceConfig.RegisterEventHandler(s.Proxier)
   933  	go endpointSliceConfig.Run(wait.NeverStop)
   934  
   935  	if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
   936  		serviceCIDRConfig := config.NewServiceCIDRConfig(informerFactory.Networking().V1alpha1().ServiceCIDRs(), s.Config.ConfigSyncPeriod.Duration)
   937  		serviceCIDRConfig.RegisterEventHandler(s.Proxier)
   938  		go serviceCIDRConfig.Run(wait.NeverStop)
   939  	}
   940  	// This has to start after the calls to NewServiceConfig because that
   941  	// function must configure its shared informer event handlers first.
   942  	informerFactory.Start(wait.NeverStop)
   943  
   944  	// Make an informer that selects for our nodename.
   945  	currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.Config.ConfigSyncPeriod.Duration,
   946  		informers.WithTweakListOptions(func(options *metav1.ListOptions) {
   947  			options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
   948  		}))
   949  	nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.Config.ConfigSyncPeriod.Duration)
   950  	// https://issues.k8s.io/111321
   951  	if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
   952  		nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(s.podCIDRs))
   953  	}
   954  	if utilfeature.DefaultFeatureGate.Enabled(features.KubeProxyDrainingTerminatingNodes) {
   955  		nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{
   956  			HealthServer: s.HealthzServer,
   957  		})
   958  	}
   959  	nodeConfig.RegisterEventHandler(s.Proxier)
   960  
   961  	go nodeConfig.Run(wait.NeverStop)
   962  
   963  	// This has to start after the calls to NewNodeConfig because that must
   964  	// configure the shared informer event handler first.
   965  	currentNodeInformerFactory.Start(wait.NeverStop)
   966  
   967  	// Birth Cry after the birth is successful
   968  	s.birthCry()
   969  
   970  	go s.Proxier.SyncLoop()
   971  
   972  	select {
   973  	case err = <-healthzErrCh:
   974  		s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartProxierHealthcheck", "StartKubeProxy", err.Error())
   975  	case err = <-metricsErrCh:
   976  		s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartMetricServer", "StartKubeProxy", err.Error())
   977  	}
   978  	return err
   979  }
   980  
   981  func (s *ProxyServer) birthCry() {
   982  	s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeNormal, "Starting", "StartKubeProxy", "")
   983  }
   984  
   985  // detectNodeIPs returns the proxier's "node IP" or IPs, and the IP family to use if the
   986  // node turns out to be incapable of dual-stack. (Note that kube-proxy normally runs as
   987  // dual-stack if the backend is capable of supporting both IP families, regardless of
   988  // whether the node is *actually* configured as dual-stack or not.)
   989  
   990  // (Note that on Linux, the node IPs are used only to determine whether a given
   991  // LoadBalancerSourceRanges value matches the node or not. In particular, they are *not*
   992  // used for NodePort handling.)
   993  //
   994  // The order of precedence is:
   995  //  1. if bindAddress is not 0.0.0.0 or ::, then it is used as the primary IP.
   996  //  2. if rawNodeIPs is not empty, then its address(es) is/are used
   997  //  3. otherwise the node IPs are 127.0.0.1 and ::1
   998  func detectNodeIPs(logger klog.Logger, rawNodeIPs []net.IP, bindAddress string) (v1.IPFamily, map[v1.IPFamily]net.IP) {
   999  	primaryFamily := v1.IPv4Protocol
  1000  	nodeIPs := map[v1.IPFamily]net.IP{
  1001  		v1.IPv4Protocol: net.IPv4(127, 0, 0, 1),
  1002  		v1.IPv6Protocol: net.IPv6loopback,
  1003  	}
  1004  
  1005  	if len(rawNodeIPs) > 0 {
  1006  		if !netutils.IsIPv4(rawNodeIPs[0]) {
  1007  			primaryFamily = v1.IPv6Protocol
  1008  		}
  1009  		nodeIPs[primaryFamily] = rawNodeIPs[0]
  1010  		if len(rawNodeIPs) > 1 {
  1011  			// If more than one address is returned, they are guaranteed to be of different families
  1012  			family := v1.IPv4Protocol
  1013  			if !netutils.IsIPv4(rawNodeIPs[1]) {
  1014  				family = v1.IPv6Protocol
  1015  			}
  1016  			nodeIPs[family] = rawNodeIPs[1]
  1017  		}
  1018  	}
  1019  
  1020  	// If a bindAddress is passed, override the primary IP
  1021  	bindIP := netutils.ParseIPSloppy(bindAddress)
  1022  	if bindIP != nil && !bindIP.IsUnspecified() {
  1023  		if netutils.IsIPv4(bindIP) {
  1024  			primaryFamily = v1.IPv4Protocol
  1025  		} else {
  1026  			primaryFamily = v1.IPv6Protocol
  1027  		}
  1028  		nodeIPs[primaryFamily] = bindIP
  1029  	}
  1030  
  1031  	if nodeIPs[primaryFamily].IsLoopback() {
  1032  		logger.Info("Can't determine this node's IP, assuming loopback; if this is incorrect, please set the --bind-address flag")
  1033  	}
  1034  	return primaryFamily, nodeIPs
  1035  }
  1036  
  1037  // getNodeIP returns IPs for the node with the provided name.  If
  1038  // required, it will wait for the node to be created.
  1039  func getNodeIPs(logger klog.Logger, client clientset.Interface, name string) []net.IP {
  1040  	var nodeIPs []net.IP
  1041  	backoff := wait.Backoff{
  1042  		Steps:    6,
  1043  		Duration: 1 * time.Second,
  1044  		Factor:   2.0,
  1045  		Jitter:   0.2,
  1046  	}
  1047  
  1048  	err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  1049  		node, err := client.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
  1050  		if err != nil {
  1051  			logger.Error(err, "Failed to retrieve node info")
  1052  			return false, nil
  1053  		}
  1054  		nodeIPs, err = utilnode.GetNodeHostIPs(node)
  1055  		if err != nil {
  1056  			logger.Error(err, "Failed to retrieve node IPs")
  1057  			return false, nil
  1058  		}
  1059  		return true, nil
  1060  	})
  1061  	if err == nil {
  1062  		logger.Info("Successfully retrieved node IP(s)", "IPs", nodeIPs)
  1063  	}
  1064  	return nodeIPs
  1065  }
  1066  

View as plain text