1
16
17
18
19 package app
20
21 import (
22 "context"
23 goflag "flag"
24 "fmt"
25 "net"
26 "net/http"
27 "os"
28 "strings"
29 "time"
30
31 "github.com/fsnotify/fsnotify"
32 "github.com/spf13/cobra"
33 "github.com/spf13/pflag"
34
35 v1 "k8s.io/api/core/v1"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/fields"
38 "k8s.io/apimachinery/pkg/labels"
39 "k8s.io/apimachinery/pkg/runtime"
40 "k8s.io/apimachinery/pkg/runtime/serializer"
41 "k8s.io/apimachinery/pkg/selection"
42 "k8s.io/apimachinery/pkg/types"
43 utilerrors "k8s.io/apimachinery/pkg/util/errors"
44 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
45 "k8s.io/apimachinery/pkg/util/validation/field"
46 "k8s.io/apimachinery/pkg/util/wait"
47 "k8s.io/apiserver/pkg/server/healthz"
48 "k8s.io/apiserver/pkg/server/mux"
49 "k8s.io/apiserver/pkg/server/routes"
50 utilfeature "k8s.io/apiserver/pkg/util/feature"
51 "k8s.io/client-go/informers"
52 clientset "k8s.io/client-go/kubernetes"
53 "k8s.io/client-go/rest"
54 "k8s.io/client-go/tools/clientcmd"
55 clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
56 "k8s.io/client-go/tools/events"
57 cliflag "k8s.io/component-base/cli/flag"
58 componentbaseconfig "k8s.io/component-base/config"
59 "k8s.io/component-base/configz"
60 "k8s.io/component-base/logs"
61 logsapi "k8s.io/component-base/logs/api/v1"
62 "k8s.io/component-base/metrics"
63 metricsfeatures "k8s.io/component-base/metrics/features"
64 "k8s.io/component-base/metrics/legacyregistry"
65 "k8s.io/component-base/metrics/prometheus/slis"
66 "k8s.io/component-base/version"
67 "k8s.io/component-base/version/verflag"
68 nodeutil "k8s.io/component-helpers/node/util"
69 "k8s.io/klog/v2"
70 "k8s.io/kube-proxy/config/v1alpha1"
71 api "k8s.io/kubernetes/pkg/apis/core"
72 "k8s.io/kubernetes/pkg/cluster/ports"
73 "k8s.io/kubernetes/pkg/features"
74 "k8s.io/kubernetes/pkg/kubelet/qos"
75 "k8s.io/kubernetes/pkg/proxy"
76 "k8s.io/kubernetes/pkg/proxy/apis"
77 kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
78 proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
79 kubeproxyconfigv1alpha1 "k8s.io/kubernetes/pkg/proxy/apis/config/v1alpha1"
80 "k8s.io/kubernetes/pkg/proxy/apis/config/validation"
81 "k8s.io/kubernetes/pkg/proxy/config"
82 "k8s.io/kubernetes/pkg/proxy/healthcheck"
83 proxyutil "k8s.io/kubernetes/pkg/proxy/util"
84 "k8s.io/kubernetes/pkg/util/filesystem"
85 utilflag "k8s.io/kubernetes/pkg/util/flag"
86 utilnode "k8s.io/kubernetes/pkg/util/node"
87 "k8s.io/kubernetes/pkg/util/oom"
88 netutils "k8s.io/utils/net"
89 "k8s.io/utils/ptr"
90 )
91
92 func init() {
93 utilruntime.Must(metricsfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
94 utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
95 }
96
97
98 type proxyRun interface {
99 Run() error
100 }
101
102
103 type Options struct {
104
105 ConfigFile string
106
107 WriteConfigTo string
108
109 CleanupAndExit bool
110
111 InitAndExit bool
112
113
114 WindowsService bool
115
116 config *kubeproxyconfig.KubeProxyConfiguration
117
118 watcher filesystem.FSWatcher
119
120 proxyServer proxyRun
121
122 errCh chan error
123
124
125
126
127
128
129
130 master string
131
132 healthzPort int32
133
134 metricsPort int32
135
136
137 hostnameOverride string
138
139 logger klog.Logger
140 }
141
142
143 func (o *Options) AddFlags(fs *pflag.FlagSet) {
144 o.addOSFlags(fs)
145
146 fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file.")
147 fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the default configuration values to this file and exit.")
148
149 fs.BoolVar(&o.CleanupAndExit, "cleanup", o.CleanupAndExit, "If true cleanup iptables and ipvs rules and exit.")
150
151 fs.Var(cliflag.NewMapStringBool(&o.config.FeatureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
152 "Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n")+"\n"+
153 "This parameter is ignored if a config file is specified by --config.")
154
155 fs.StringVar(&o.config.ClientConnection.Kubeconfig, "kubeconfig", o.config.ClientConnection.Kubeconfig, "Path to kubeconfig file with authorization information (the master location can be overridden by the master flag).")
156 fs.StringVar(&o.master, "master", o.master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
157 fs.StringVar(&o.config.ClientConnection.ContentType, "kube-api-content-type", o.config.ClientConnection.ContentType, "Content type of requests sent to apiserver.")
158 fs.Int32Var(&o.config.ClientConnection.Burst, "kube-api-burst", o.config.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver")
159 fs.Float32Var(&o.config.ClientConnection.QPS, "kube-api-qps", o.config.ClientConnection.QPS, "QPS to use while talking with kubernetes apiserver")
160
161 fs.StringVar(&o.hostnameOverride, "hostname-override", o.hostnameOverride, "If non-empty, will be used as the name of the Node that kube-proxy is running on. If unset, the node name is assumed to be the same as the node's hostname.")
162 fs.Var(&utilflag.IPVar{Val: &o.config.BindAddress}, "bind-address", "Overrides kube-proxy's idea of what its node's primary IP is. Note that the name is a historical artifact, and kube-proxy does not actually bind any sockets to this IP. This parameter is ignored if a config file is specified by --config.")
163 fs.Var(&utilflag.IPPortVar{Val: &o.config.HealthzBindAddress}, "healthz-bind-address", "The IP address and port for the health check server to serve on, defaulting to \"0.0.0.0:10256\" (if --bind-address is unset or IPv4), or \"[::]:10256\" (if --bind-address is IPv6). Set empty to disable. This parameter is ignored if a config file is specified by --config.")
164 fs.Var(&utilflag.IPPortVar{Val: &o.config.MetricsBindAddress}, "metrics-bind-address", "The IP address and port for the metrics server to serve on, defaulting to \"127.0.0.1:10249\" (if --bind-address is unset or IPv4), or \"[::1]:10249\" (if --bind-address is IPv6). (Set to \"0.0.0.0:10249\" / \"[::]:10249\" to bind on all interfaces.) Set empty to disable. This parameter is ignored if a config file is specified by --config.")
165 fs.BoolVar(&o.config.BindAddressHardFail, "bind-address-hard-fail", o.config.BindAddressHardFail, "If true kube-proxy will treat failure to bind to a port as fatal and exit")
166 fs.BoolVar(&o.config.EnableProfiling, "profiling", o.config.EnableProfiling, "If true enables profiling via web interface on /debug/pprof handler. This parameter is ignored if a config file is specified by --config.")
167 fs.StringVar(&o.config.ShowHiddenMetricsForVersion, "show-hidden-metrics-for-version", o.config.ShowHiddenMetricsForVersion,
168 "The previous version for which you want to show hidden metrics. "+
169 "Only the previous minor version is meaningful, other values will not be allowed. "+
170 "The format is <major>.<minor>, e.g.: '1.16'. "+
171 "The purpose of this format is make sure you have the opportunity to notice if the next release hides additional metrics, "+
172 "rather than being surprised when they are permanently removed in the release after that. "+
173 "This parameter is ignored if a config file is specified by --config.")
174 fs.BoolVar(&o.InitAndExit, "init-only", o.InitAndExit, "If true, perform any initialization steps that must be done with full root privileges, and then exit. After doing this, you can run kube-proxy again with only the CAP_NET_ADMIN capability.")
175 fs.Var(&o.config.Mode, "proxy-mode", "Which proxy mode to use: on Linux this can be 'iptables' (default) or 'ipvs'. On Windows the only supported value is 'kernelspace'."+
176 "This parameter is ignored if a config file is specified by --config.")
177
178 fs.Int32Var(o.config.IPTables.MasqueradeBit, "iptables-masquerade-bit", ptr.Deref(o.config.IPTables.MasqueradeBit, 14), "If using the iptables or ipvs proxy mode, the bit of the fwmark space to mark packets requiring SNAT with. Must be within the range [0, 31].")
179 fs.BoolVar(&o.config.IPTables.MasqueradeAll, "masquerade-all", o.config.IPTables.MasqueradeAll, "If using the iptables or ipvs proxy mode, SNAT all traffic sent via Service cluster IPs. This may be required with some CNI plugins.")
180 fs.BoolVar(o.config.IPTables.LocalhostNodePorts, "iptables-localhost-nodeports", ptr.Deref(o.config.IPTables.LocalhostNodePorts, true), "If false, kube-proxy will disable the legacy behavior of allowing NodePort services to be accessed via localhost. (Applies only to iptables mode and IPv4; localhost NodePorts are never allowed with other proxy modes or with IPv6.)")
181 fs.DurationVar(&o.config.IPTables.SyncPeriod.Duration, "iptables-sync-period", o.config.IPTables.SyncPeriod.Duration, "An interval (e.g. '5s', '1m', '2h22m') indicating how frequently various re-synchronizing and cleanup operations are performed. Must be greater than 0.")
182 fs.DurationVar(&o.config.IPTables.MinSyncPeriod.Duration, "iptables-min-sync-period", o.config.IPTables.MinSyncPeriod.Duration, "The minimum period between iptables rule resyncs (e.g. '5s', '1m', '2h22m'). A value of 0 means every Service or EndpointSlice change will result in an immediate iptables resync.")
183
184 fs.DurationVar(&o.config.IPVS.SyncPeriod.Duration, "ipvs-sync-period", o.config.IPVS.SyncPeriod.Duration, "An interval (e.g. '5s', '1m', '2h22m') indicating how frequently various re-synchronizing and cleanup operations are performed. Must be greater than 0.")
185 fs.DurationVar(&o.config.IPVS.MinSyncPeriod.Duration, "ipvs-min-sync-period", o.config.IPVS.MinSyncPeriod.Duration, "The minimum period between IPVS rule resyncs (e.g. '5s', '1m', '2h22m'). A value of 0 means every Service or EndpointSlice change will result in an immediate IPVS resync.")
186 fs.StringVar(&o.config.IPVS.Scheduler, "ipvs-scheduler", o.config.IPVS.Scheduler, "The ipvs scheduler type when proxy mode is ipvs")
187 fs.StringSliceVar(&o.config.IPVS.ExcludeCIDRs, "ipvs-exclude-cidrs", o.config.IPVS.ExcludeCIDRs, "A comma-separated list of CIDRs which the ipvs proxier should not touch when cleaning up IPVS rules.")
188 fs.BoolVar(&o.config.IPVS.StrictARP, "ipvs-strict-arp", o.config.IPVS.StrictARP, "Enable strict ARP by setting arp_ignore to 1 and arp_announce to 2")
189 fs.DurationVar(&o.config.IPVS.TCPTimeout.Duration, "ipvs-tcp-timeout", o.config.IPVS.TCPTimeout.Duration, "The timeout for idle IPVS TCP connections, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
190 fs.DurationVar(&o.config.IPVS.TCPFinTimeout.Duration, "ipvs-tcpfin-timeout", o.config.IPVS.TCPFinTimeout.Duration, "The timeout for IPVS TCP connections after receiving a FIN packet, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
191 fs.DurationVar(&o.config.IPVS.UDPTimeout.Duration, "ipvs-udp-timeout", o.config.IPVS.UDPTimeout.Duration, "The timeout for IPVS UDP packets, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
192
193 fs.Var(&o.config.DetectLocalMode, "detect-local-mode", "Mode to use to detect local traffic. This parameter is ignored if a config file is specified by --config.")
194 fs.StringVar(&o.config.DetectLocal.BridgeInterface, "pod-bridge-interface", o.config.DetectLocal.BridgeInterface, "A bridge interface name. When --detect-local-mode is set to BridgeInterface, kube-proxy will consider traffic to be local if it originates from this bridge.")
195 fs.StringVar(&o.config.DetectLocal.InterfaceNamePrefix, "pod-interface-name-prefix", o.config.DetectLocal.InterfaceNamePrefix, "An interface name prefix. When --detect-local-mode is set to InterfaceNamePrefix, kube-proxy will consider traffic to be local if it originates from any interface whose name begins with this prefix.")
196 fs.StringVar(&o.config.ClusterCIDR, "cluster-cidr", o.config.ClusterCIDR, "The CIDR range of the pods in the cluster. (For dual-stack clusters, this can be a comma-separated dual-stack pair of CIDR ranges.). When --detect-local-mode is set to ClusterCIDR, kube-proxy will consider traffic to be local if its source IP is in this range. (Otherwise it is not used.) "+
197 "This parameter is ignored if a config file is specified by --config.")
198
199 fs.StringSliceVar(&o.config.NodePortAddresses, "nodeport-addresses", o.config.NodePortAddresses,
200 "A list of CIDR ranges that contain valid node IPs. If set, connections to NodePort services will only be accepted on node IPs in one of the indicated ranges. If unset, NodePort connections will be accepted on all local IPs. This parameter is ignored if a config file is specified by --config.")
201
202 fs.Int32Var(o.config.OOMScoreAdj, "oom-score-adj", ptr.Deref(o.config.OOMScoreAdj, int32(qos.KubeProxyOOMScoreAdj)), "The oom-score-adj value for kube-proxy process. Values must be within the range [-1000, 1000]. This parameter is ignored if a config file is specified by --config.")
203 fs.Int32Var(o.config.Conntrack.MaxPerCore, "conntrack-max-per-core", *o.config.Conntrack.MaxPerCore,
204 "Maximum number of NAT connections to track per CPU core (0 to leave the limit as-is and ignore conntrack-min).")
205 fs.Int32Var(o.config.Conntrack.Min, "conntrack-min", *o.config.Conntrack.Min,
206 "Minimum number of conntrack entries to allocate, regardless of conntrack-max-per-core (set conntrack-max-per-core=0 to leave the limit as-is).")
207
208 fs.DurationVar(&o.config.Conntrack.TCPEstablishedTimeout.Duration, "conntrack-tcp-timeout-established", o.config.Conntrack.TCPEstablishedTimeout.Duration, "Idle timeout for established TCP connections (0 to leave as-is)")
209 fs.DurationVar(
210 &o.config.Conntrack.TCPCloseWaitTimeout.Duration, "conntrack-tcp-timeout-close-wait",
211 o.config.Conntrack.TCPCloseWaitTimeout.Duration,
212 "NAT timeout for TCP connections in the CLOSE_WAIT state")
213 fs.BoolVar(&o.config.Conntrack.TCPBeLiberal, "conntrack-tcp-be-liberal", o.config.Conntrack.TCPBeLiberal, "Enable liberal mode for tracking TCP packets by setting nf_conntrack_tcp_be_liberal to 1")
214 fs.DurationVar(&o.config.Conntrack.UDPTimeout.Duration, "conntrack-udp-timeout", o.config.Conntrack.UDPTimeout.Duration, "Idle timeout for UNREPLIED UDP connections (0 to leave as-is)")
215 fs.DurationVar(&o.config.Conntrack.UDPStreamTimeout.Duration, "conntrack-udp-timeout-stream", o.config.Conntrack.UDPStreamTimeout.Duration, "Idle timeout for ASSURED UDP connections (0 to leave as-is)")
216
217 fs.DurationVar(&o.config.ConfigSyncPeriod.Duration, "config-sync-period", o.config.ConfigSyncPeriod.Duration, "How often configuration from the apiserver is refreshed. Must be greater than 0.")
218
219 fs.Int32Var(&o.healthzPort, "healthz-port", o.healthzPort, "The port to bind the health check server. Use 0 to disable.")
220 _ = fs.MarkDeprecated("healthz-port", "This flag is deprecated and will be removed in a future release. Please use --healthz-bind-address instead.")
221 fs.Int32Var(&o.metricsPort, "metrics-port", o.metricsPort, "The port to bind the metrics server. Use 0 to disable.")
222 _ = fs.MarkDeprecated("metrics-port", "This flag is deprecated and will be removed in a future release. Please use --metrics-bind-address instead.")
223 fs.Var(utilflag.PortRangeVar{Val: &o.config.PortRange}, "proxy-port-range", "This was previously used to configure the userspace proxy, but is now unused.")
224 _ = fs.MarkDeprecated("proxy-port-range", "This flag has no effect and will be removed in a future release.")
225
226 logsapi.AddFlags(&o.config.Logging, fs)
227 }
228
229
230 func newKubeProxyConfiguration() *kubeproxyconfig.KubeProxyConfiguration {
231 versionedConfig := &v1alpha1.KubeProxyConfiguration{}
232 proxyconfigscheme.Scheme.Default(versionedConfig)
233 internalConfig, err := proxyconfigscheme.Scheme.ConvertToVersion(versionedConfig, kubeproxyconfig.SchemeGroupVersion)
234 if err != nil {
235 panic(fmt.Sprintf("Unable to create default config: %v", err))
236 }
237
238 return internalConfig.(*kubeproxyconfig.KubeProxyConfiguration)
239 }
240
241
242 func NewOptions() *Options {
243 return &Options{
244 config: newKubeProxyConfiguration(),
245 healthzPort: ports.ProxyHealthzPort,
246 metricsPort: ports.ProxyStatusPort,
247 errCh: make(chan error),
248 logger: klog.FromContext(context.Background()),
249 }
250 }
251
252
253 func (o *Options) Complete(fs *pflag.FlagSet) error {
254 if len(o.ConfigFile) == 0 && len(o.WriteConfigTo) == 0 {
255 o.config.HealthzBindAddress = addressFromDeprecatedFlags(o.config.HealthzBindAddress, o.healthzPort)
256 o.config.MetricsBindAddress = addressFromDeprecatedFlags(o.config.MetricsBindAddress, o.metricsPort)
257 }
258
259
260 if len(o.ConfigFile) > 0 {
261 c, err := o.loadConfigFromFile(o.ConfigFile)
262 if err != nil {
263 return err
264 }
265
266
267
268
269
270
271
272 copyLogsFromFlags(fs, &c.Logging)
273 o.config = c
274
275 if err := o.initWatcher(); err != nil {
276 return err
277 }
278 }
279
280 o.platformApplyDefaults(o.config)
281
282 if err := o.processHostnameOverrideFlag(); err != nil {
283 return err
284 }
285
286 return utilfeature.DefaultMutableFeatureGate.SetFromMap(o.config.FeatureGates)
287 }
288
289
290
291
292
293
294
295 func copyLogsFromFlags(from *pflag.FlagSet, to *logsapi.LoggingConfiguration) error {
296 var cloneFS pflag.FlagSet
297 logsapi.AddFlags(to, &cloneFS)
298 vmodule := to.VModule
299 to.VModule = nil
300 var err error
301 cloneFS.VisitAll(func(f *pflag.Flag) {
302 if err != nil {
303 return
304 }
305 fsFlag := from.Lookup(f.Name)
306 if fsFlag == nil {
307 err = fmt.Errorf("logging flag %s not found in flag set", f.Name)
308 return
309 }
310 if !fsFlag.Changed {
311 return
312 }
313 if setErr := f.Value.Set(fsFlag.Value.String()); setErr != nil {
314 err = fmt.Errorf("copying flag %s value: %v", f.Name, setErr)
315 return
316 }
317 })
318 to.VModule = append(to.VModule, vmodule...)
319 return err
320 }
321
322
323 func (o *Options) initWatcher() error {
324 fswatcher := filesystem.NewFsnotifyWatcher()
325 err := fswatcher.Init(o.eventHandler, o.errorHandler)
326 if err != nil {
327 return err
328 }
329 err = fswatcher.AddWatch(o.ConfigFile)
330 if err != nil {
331 return err
332 }
333 o.watcher = fswatcher
334 return nil
335 }
336
337 func (o *Options) eventHandler(ent fsnotify.Event) {
338 if ent.Has(fsnotify.Write) || ent.Has(fsnotify.Rename) {
339
340 o.errCh <- fmt.Errorf("content of the proxy server's configuration file was updated")
341 return
342 }
343 o.errCh <- nil
344 }
345
346 func (o *Options) errorHandler(err error) {
347 o.errCh <- err
348 }
349
350
351 func (o *Options) processHostnameOverrideFlag() error {
352
353 if len(o.hostnameOverride) > 0 {
354 hostName := strings.TrimSpace(o.hostnameOverride)
355 if len(hostName) == 0 {
356 return fmt.Errorf("empty hostname-override is invalid")
357 }
358 o.config.HostnameOverride = strings.ToLower(hostName)
359 }
360
361 return nil
362 }
363
364
365 func (o *Options) Validate() error {
366 if errs := validation.Validate(o.config); len(errs) != 0 {
367 return errs.ToAggregate()
368 }
369
370 return nil
371 }
372
373
374 func (o *Options) Run() error {
375 defer close(o.errCh)
376 if len(o.WriteConfigTo) > 0 {
377 return o.writeConfigFile()
378 }
379
380 err := platformCleanup(o.config.Mode, o.CleanupAndExit)
381 if o.CleanupAndExit {
382 return err
383 }
384
385
386
387 proxyServer, err := newProxyServer(o.logger, o.config, o.master, o.InitAndExit)
388 if err != nil {
389 return err
390 }
391 if o.InitAndExit {
392 return nil
393 }
394
395 o.proxyServer = proxyServer
396 return o.runLoop()
397 }
398
399
400
401 func (o *Options) runLoop() error {
402 if o.watcher != nil {
403 o.watcher.Run()
404 }
405
406
407 go func() {
408 err := o.proxyServer.Run()
409 o.errCh <- err
410 }()
411
412 for {
413 err := <-o.errCh
414 if err != nil {
415 return err
416 }
417 }
418 }
419
420 func (o *Options) writeConfigFile() (err error) {
421 const mediaType = runtime.ContentTypeYAML
422 info, ok := runtime.SerializerInfoForMediaType(proxyconfigscheme.Codecs.SupportedMediaTypes(), mediaType)
423 if !ok {
424 return fmt.Errorf("unable to locate encoder -- %q is not a supported media type", mediaType)
425 }
426
427 encoder := proxyconfigscheme.Codecs.EncoderForVersion(info.Serializer, v1alpha1.SchemeGroupVersion)
428
429 configFile, err := os.Create(o.WriteConfigTo)
430 if err != nil {
431 return err
432 }
433
434 defer func() {
435 ferr := configFile.Close()
436 if ferr != nil && err == nil {
437 err = ferr
438 }
439 }()
440
441 if err = encoder.Encode(o.config, configFile); err != nil {
442 return err
443 }
444
445 o.logger.Info("Wrote configuration", "file", o.WriteConfigTo)
446
447 return nil
448 }
449
450
451
452
453
454 func addressFromDeprecatedFlags(addr string, port int32) string {
455 if port == 0 {
456 return ""
457 }
458 return proxyutil.AppendPortIfNeeded(addr, port)
459 }
460
461
462
463 func newLenientSchemeAndCodecs() (*runtime.Scheme, *serializer.CodecFactory, error) {
464 lenientScheme := runtime.NewScheme()
465 if err := kubeproxyconfig.AddToScheme(lenientScheme); err != nil {
466 return nil, nil, fmt.Errorf("failed to add kube-proxy config API to lenient scheme: %v", err)
467 }
468 if err := kubeproxyconfigv1alpha1.AddToScheme(lenientScheme); err != nil {
469 return nil, nil, fmt.Errorf("failed to add kube-proxy config v1alpha1 API to lenient scheme: %v", err)
470 }
471 lenientCodecs := serializer.NewCodecFactory(lenientScheme, serializer.DisableStrict)
472 return lenientScheme, &lenientCodecs, nil
473 }
474
475
476
477 func (o *Options) loadConfigFromFile(file string) (*kubeproxyconfig.KubeProxyConfiguration, error) {
478 data, err := os.ReadFile(file)
479 if err != nil {
480 return nil, err
481 }
482
483 return o.loadConfig(data)
484 }
485
486
487 func (o *Options) loadConfig(data []byte) (*kubeproxyconfig.KubeProxyConfiguration, error) {
488
489 configObj, gvk, err := proxyconfigscheme.Codecs.UniversalDecoder().Decode(data, nil, nil)
490 if err != nil {
491
492
493
494 if !runtime.IsStrictDecodingError(err) {
495 return nil, fmt.Errorf("failed to decode: %w", err)
496 }
497
498 _, lenientCodecs, lenientErr := newLenientSchemeAndCodecs()
499 if lenientErr != nil {
500 return nil, lenientErr
501 }
502
503 configObj, gvk, lenientErr = lenientCodecs.UniversalDecoder().Decode(data, nil, nil)
504 if lenientErr != nil {
505
506
507 return nil, fmt.Errorf("failed lenient decoding: %v", err)
508 }
509
510
511 o.logger.Info("Using lenient decoding as strict decoding failed", "err", err)
512 }
513
514 proxyConfig, ok := configObj.(*kubeproxyconfig.KubeProxyConfiguration)
515 if !ok {
516 return nil, fmt.Errorf("got unexpected config type: %v", gvk)
517 }
518 return proxyConfig, nil
519 }
520
521
522 func NewProxyCommand() *cobra.Command {
523 opts := NewOptions()
524
525 cmd := &cobra.Command{
526 Use: "kube-proxy",
527 Long: `The Kubernetes network proxy runs on each node. This
528 reflects services as defined in the Kubernetes API on each node and can do simple
529 TCP, UDP, and SCTP stream forwarding or round robin TCP, UDP, and SCTP forwarding across a set of backends.
530 Service cluster IPs and ports are currently found through Docker-links-compatible
531 environment variables specifying ports opened by the service proxy. There is an optional
532 addon that provides cluster DNS for these cluster IPs. The user must create a service
533 with the apiserver API to configure the proxy.`,
534 RunE: func(cmd *cobra.Command, args []string) error {
535 verflag.PrintAndExitIfRequested()
536
537 if err := initForOS(opts.WindowsService); err != nil {
538 return fmt.Errorf("failed os init: %w", err)
539 }
540
541 if err := opts.Complete(cmd.Flags()); err != nil {
542 return fmt.Errorf("failed complete: %w", err)
543 }
544
545 logs.InitLogs()
546 if err := logsapi.ValidateAndApplyAsField(&opts.config.Logging, utilfeature.DefaultFeatureGate, field.NewPath("logging")); err != nil {
547 return fmt.Errorf("initialize logging: %v", err)
548 }
549
550 cliflag.PrintFlags(cmd.Flags())
551
552 if err := opts.Validate(); err != nil {
553 return fmt.Errorf("failed validate: %w", err)
554 }
555
556 utilfeature.DefaultMutableFeatureGate.AddMetrics()
557 if err := opts.Run(); err != nil {
558 opts.logger.Error(err, "Error running ProxyServer")
559 return err
560 }
561
562 return nil
563 },
564 Args: func(cmd *cobra.Command, args []string) error {
565 for _, arg := range args {
566 if len(arg) > 0 {
567 return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
568 }
569 }
570 return nil
571 },
572 }
573
574 fs := cmd.Flags()
575 opts.AddFlags(fs)
576 fs.AddGoFlagSet(goflag.CommandLine)
577
578 _ = cmd.MarkFlagFilename("config", "yaml", "yml", "json")
579
580 return cmd
581 }
582
583
584
585 type ProxyServer struct {
586 Config *kubeproxyconfig.KubeProxyConfiguration
587
588 Client clientset.Interface
589 Broadcaster events.EventBroadcaster
590 Recorder events.EventRecorder
591 NodeRef *v1.ObjectReference
592 HealthzServer *healthcheck.ProxierHealthServer
593 Hostname string
594 PrimaryIPFamily v1.IPFamily
595 NodeIPs map[v1.IPFamily]net.IP
596
597 podCIDRs []string
598
599 Proxier proxy.Provider
600
601 logger klog.Logger
602 }
603
604
605 func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfiguration, master string, initOnly bool) (*ProxyServer, error) {
606 s := &ProxyServer{
607 Config: config,
608 logger: logger,
609 }
610
611 cz, err := configz.New(kubeproxyconfig.GroupName)
612 if err != nil {
613 return nil, fmt.Errorf("unable to register configz: %s", err)
614 }
615 cz.Set(config)
616
617 if len(config.ShowHiddenMetricsForVersion) > 0 {
618 metrics.SetShowHidden()
619 }
620
621 s.Hostname, err = nodeutil.GetHostname(config.HostnameOverride)
622 if err != nil {
623 return nil, err
624 }
625
626 s.Client, err = createClient(logger, config.ClientConnection, master)
627 if err != nil {
628 return nil, err
629 }
630
631 rawNodeIPs := getNodeIPs(logger, s.Client, s.Hostname)
632 s.PrimaryIPFamily, s.NodeIPs = detectNodeIPs(logger, rawNodeIPs, config.BindAddress)
633
634 s.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: s.Client.EventsV1()})
635 s.Recorder = s.Broadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")
636
637 s.NodeRef = &v1.ObjectReference{
638 Kind: "Node",
639 Name: s.Hostname,
640 UID: types.UID(s.Hostname),
641 Namespace: "",
642 }
643
644 if len(config.HealthzBindAddress) > 0 {
645 s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration)
646 }
647
648 err = s.platformSetup()
649 if err != nil {
650 return nil, err
651 }
652
653 ipv4Supported, ipv6Supported, dualStackSupported, err := s.platformCheckSupported()
654 if err != nil {
655 return nil, err
656 } else if (s.PrimaryIPFamily == v1.IPv4Protocol && !ipv4Supported) || (s.PrimaryIPFamily == v1.IPv6Protocol && !ipv6Supported) {
657 return nil, fmt.Errorf("no support for primary IP family %q", s.PrimaryIPFamily)
658 } else if dualStackSupported {
659 logger.Info("kube-proxy running in dual-stack mode", "primary ipFamily", s.PrimaryIPFamily)
660 } else {
661 logger.Info("kube-proxy running in single-stack mode", "ipFamily", s.PrimaryIPFamily)
662 }
663
664 err, fatal := checkIPConfig(s, dualStackSupported)
665 if err != nil {
666 if fatal {
667 return nil, fmt.Errorf("kube-proxy configuration is incorrect: %v", err)
668 }
669 logger.Error(err, "Kube-proxy configuration may be incomplete or incorrect")
670 }
671
672 s.Proxier, err = s.createProxier(config, dualStackSupported, initOnly)
673 if err != nil {
674 return nil, err
675 }
676
677 return s, nil
678 }
679
680
681 func checkIPConfig(s *ProxyServer, dualStackSupported bool) (error, bool) {
682 var errors []error
683 var badFamily netutils.IPFamily
684
685 if s.PrimaryIPFamily == v1.IPv4Protocol {
686 badFamily = netutils.IPv6
687 } else {
688 badFamily = netutils.IPv4
689 }
690
691 var clusterType string
692 if dualStackSupported {
693 clusterType = fmt.Sprintf("%s-primary", s.PrimaryIPFamily)
694 } else {
695 clusterType = fmt.Sprintf("%s-only", s.PrimaryIPFamily)
696 }
697
698
699
700
701 fatal := false
702
703 if s.Config.ClusterCIDR != "" {
704 clusterCIDRs := strings.Split(s.Config.ClusterCIDR, ",")
705 if badCIDRs(clusterCIDRs, badFamily) {
706 errors = append(errors, fmt.Errorf("cluster is %s but clusterCIDRs contains only IPv%s addresses", clusterType, badFamily))
707 if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeClusterCIDR && !dualStackSupported {
708
709 fatal = true
710 }
711 }
712 }
713
714 if badCIDRs(s.Config.NodePortAddresses, badFamily) {
715 errors = append(errors, fmt.Errorf("cluster is %s but nodePortAddresses contains only IPv%s addresses", clusterType, badFamily))
716 }
717
718 if badCIDRs(s.podCIDRs, badFamily) {
719 errors = append(errors, fmt.Errorf("cluster is %s but node.spec.podCIDRs contains only IPv%s addresses", clusterType, badFamily))
720 if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
721
722 fatal = true
723 }
724 }
725
726 if netutils.IPFamilyOfString(s.Config.Winkernel.SourceVip) == badFamily {
727 errors = append(errors, fmt.Errorf("cluster is %s but winkernel.sourceVip is IPv%s", clusterType, badFamily))
728 }
729
730
731
732 if !dualStackSupported {
733 if badCIDRs(s.Config.IPVS.ExcludeCIDRs, badFamily) {
734 errors = append(errors, fmt.Errorf("cluster is %s but ipvs.excludeCIDRs contains only IPv%s addresses", clusterType, badFamily))
735 }
736
737 if badBindAddress(s.Config.HealthzBindAddress, badFamily) {
738 errors = append(errors, fmt.Errorf("cluster is %s but healthzBindAddress is IPv%s", clusterType, badFamily))
739 }
740 if badBindAddress(s.Config.MetricsBindAddress, badFamily) {
741 errors = append(errors, fmt.Errorf("cluster is %s but metricsBindAddress is IPv%s", clusterType, badFamily))
742 }
743 }
744
745 return utilerrors.NewAggregate(errors), fatal
746 }
747
748
749 func badCIDRs(cidrs []string, wrongFamily netutils.IPFamily) bool {
750 if len(cidrs) == 0 {
751 return false
752 }
753 for _, cidr := range cidrs {
754 if netutils.IPFamilyOfCIDRString(cidr) != wrongFamily {
755 return false
756 }
757 }
758 return true
759 }
760
761
762
763 func badBindAddress(bindAddress string, wrongFamily netutils.IPFamily) bool {
764 if host, _, _ := net.SplitHostPort(bindAddress); host != "" {
765 ip := netutils.ParseIPSloppy(host)
766 if ip != nil && netutils.IPFamilyOf(ip) == wrongFamily && !ip.IsUnspecified() {
767 return true
768 }
769 }
770 return false
771 }
772
773
774
775 func createClient(logger klog.Logger, config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
776 var kubeConfig *rest.Config
777 var err error
778
779 if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 {
780 logger.Info("Neither kubeconfig file nor master URL was specified, falling back to in-cluster config")
781 kubeConfig, err = rest.InClusterConfig()
782 } else {
783
784
785 kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
786 &clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
787 &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig()
788 }
789 if err != nil {
790 return nil, err
791 }
792
793 kubeConfig.AcceptContentTypes = config.AcceptContentTypes
794 kubeConfig.ContentType = config.ContentType
795 kubeConfig.QPS = config.QPS
796 kubeConfig.Burst = int(config.Burst)
797
798 client, err := clientset.NewForConfig(kubeConfig)
799 if err != nil {
800 return nil, err
801 }
802
803 return client, nil
804 }
805
806 func serveHealthz(logger klog.Logger, hz *healthcheck.ProxierHealthServer, errCh chan error) {
807 if hz == nil {
808 return
809 }
810
811 fn := func() {
812 err := hz.Run()
813 if err != nil {
814 logger.Error(err, "Healthz server failed")
815 if errCh != nil {
816 errCh <- fmt.Errorf("healthz server failed: %v", err)
817
818 blockCh := make(chan error)
819 <-blockCh
820 }
821 } else {
822 logger.Error(nil, "Healthz server returned without error")
823 }
824 }
825 go wait.Until(fn, 5*time.Second, wait.NeverStop)
826 }
827
828 func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
829 if len(bindAddress) == 0 {
830 return
831 }
832
833 proxyMux := mux.NewPathRecorderMux("kube-proxy")
834 healthz.InstallHandler(proxyMux)
835 slis.SLIMetricsWithReset{}.Install(proxyMux)
836
837 proxyMux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
838 w.Header().Set("Content-Type", "text/plain; charset=utf-8")
839 w.Header().Set("X-Content-Type-Options", "nosniff")
840 fmt.Fprintf(w, "%s", proxyMode)
841 })
842
843 proxyMux.Handle("/metrics", legacyregistry.Handler())
844
845 if enableProfiling {
846 routes.Profiling{}.Install(proxyMux)
847 routes.DebugFlags{}.Install(proxyMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
848 }
849
850 configz.InstallHandler(proxyMux)
851
852 fn := func() {
853 err := http.ListenAndServe(bindAddress, proxyMux)
854 if err != nil {
855 err = fmt.Errorf("starting metrics server failed: %v", err)
856 utilruntime.HandleError(err)
857 if errCh != nil {
858 errCh <- err
859
860 blockCh := make(chan error)
861 <-blockCh
862 }
863 }
864 }
865 go wait.Until(fn, 5*time.Second, wait.NeverStop)
866 }
867
868
869
870 func (s *ProxyServer) Run() error {
871
872 s.logger.Info("Version info", "version", version.Get())
873
874 s.logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
875
876
877 var oomAdjuster *oom.OOMAdjuster
878 if s.Config.OOMScoreAdj != nil {
879 oomAdjuster = oom.NewOOMAdjuster()
880 if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.Config.OOMScoreAdj)); err != nil {
881 s.logger.V(2).Info("Failed to apply OOMScore", "err", err)
882 }
883 }
884
885 if s.Broadcaster != nil {
886 stopCh := make(chan struct{})
887 s.Broadcaster.StartRecordingToSink(stopCh)
888 }
889
890
891
892 var healthzErrCh, metricsErrCh chan error
893 if s.Config.BindAddressHardFail {
894 healthzErrCh = make(chan error)
895 metricsErrCh = make(chan error)
896 }
897
898
899 serveHealthz(s.logger, s.HealthzServer, healthzErrCh)
900
901
902 serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
903
904 noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
905 if err != nil {
906 return err
907 }
908
909 noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
910 if err != nil {
911 return err
912 }
913
914 labelSelector := labels.NewSelector()
915 labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
916
917
918 informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.Config.ConfigSyncPeriod.Duration,
919 informers.WithTweakListOptions(func(options *metav1.ListOptions) {
920 options.LabelSelector = labelSelector.String()
921 }))
922
923
924
925
926
927 serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.Config.ConfigSyncPeriod.Duration)
928 serviceConfig.RegisterEventHandler(s.Proxier)
929 go serviceConfig.Run(wait.NeverStop)
930
931 endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.Config.ConfigSyncPeriod.Duration)
932 endpointSliceConfig.RegisterEventHandler(s.Proxier)
933 go endpointSliceConfig.Run(wait.NeverStop)
934
935 if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
936 serviceCIDRConfig := config.NewServiceCIDRConfig(informerFactory.Networking().V1alpha1().ServiceCIDRs(), s.Config.ConfigSyncPeriod.Duration)
937 serviceCIDRConfig.RegisterEventHandler(s.Proxier)
938 go serviceCIDRConfig.Run(wait.NeverStop)
939 }
940
941
942 informerFactory.Start(wait.NeverStop)
943
944
945 currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.Config.ConfigSyncPeriod.Duration,
946 informers.WithTweakListOptions(func(options *metav1.ListOptions) {
947 options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
948 }))
949 nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.Config.ConfigSyncPeriod.Duration)
950
951 if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
952 nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(s.podCIDRs))
953 }
954 if utilfeature.DefaultFeatureGate.Enabled(features.KubeProxyDrainingTerminatingNodes) {
955 nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{
956 HealthServer: s.HealthzServer,
957 })
958 }
959 nodeConfig.RegisterEventHandler(s.Proxier)
960
961 go nodeConfig.Run(wait.NeverStop)
962
963
964
965 currentNodeInformerFactory.Start(wait.NeverStop)
966
967
968 s.birthCry()
969
970 go s.Proxier.SyncLoop()
971
972 select {
973 case err = <-healthzErrCh:
974 s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartProxierHealthcheck", "StartKubeProxy", err.Error())
975 case err = <-metricsErrCh:
976 s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartMetricServer", "StartKubeProxy", err.Error())
977 }
978 return err
979 }
980
981 func (s *ProxyServer) birthCry() {
982 s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeNormal, "Starting", "StartKubeProxy", "")
983 }
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998 func detectNodeIPs(logger klog.Logger, rawNodeIPs []net.IP, bindAddress string) (v1.IPFamily, map[v1.IPFamily]net.IP) {
999 primaryFamily := v1.IPv4Protocol
1000 nodeIPs := map[v1.IPFamily]net.IP{
1001 v1.IPv4Protocol: net.IPv4(127, 0, 0, 1),
1002 v1.IPv6Protocol: net.IPv6loopback,
1003 }
1004
1005 if len(rawNodeIPs) > 0 {
1006 if !netutils.IsIPv4(rawNodeIPs[0]) {
1007 primaryFamily = v1.IPv6Protocol
1008 }
1009 nodeIPs[primaryFamily] = rawNodeIPs[0]
1010 if len(rawNodeIPs) > 1 {
1011
1012 family := v1.IPv4Protocol
1013 if !netutils.IsIPv4(rawNodeIPs[1]) {
1014 family = v1.IPv6Protocol
1015 }
1016 nodeIPs[family] = rawNodeIPs[1]
1017 }
1018 }
1019
1020
1021 bindIP := netutils.ParseIPSloppy(bindAddress)
1022 if bindIP != nil && !bindIP.IsUnspecified() {
1023 if netutils.IsIPv4(bindIP) {
1024 primaryFamily = v1.IPv4Protocol
1025 } else {
1026 primaryFamily = v1.IPv6Protocol
1027 }
1028 nodeIPs[primaryFamily] = bindIP
1029 }
1030
1031 if nodeIPs[primaryFamily].IsLoopback() {
1032 logger.Info("Can't determine this node's IP, assuming loopback; if this is incorrect, please set the --bind-address flag")
1033 }
1034 return primaryFamily, nodeIPs
1035 }
1036
1037
1038
1039 func getNodeIPs(logger klog.Logger, client clientset.Interface, name string) []net.IP {
1040 var nodeIPs []net.IP
1041 backoff := wait.Backoff{
1042 Steps: 6,
1043 Duration: 1 * time.Second,
1044 Factor: 2.0,
1045 Jitter: 0.2,
1046 }
1047
1048 err := wait.ExponentialBackoff(backoff, func() (bool, error) {
1049 node, err := client.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
1050 if err != nil {
1051 logger.Error(err, "Failed to retrieve node info")
1052 return false, nil
1053 }
1054 nodeIPs, err = utilnode.GetNodeHostIPs(node)
1055 if err != nil {
1056 logger.Error(err, "Failed to retrieve node IPs")
1057 return false, nil
1058 }
1059 return true, nil
1060 })
1061 if err == nil {
1062 logger.Info("Successfully retrieved node IP(s)", "IPs", nodeIPs)
1063 }
1064 return nodeIPs
1065 }
1066
View as plain text