1
2
3
4
19
20
21
22 package app
23
24 import (
25 "context"
26 "errors"
27 "fmt"
28 goruntime "runtime"
29 "strings"
30 "time"
31
32 "github.com/google/cadvisor/machine"
33 "github.com/google/cadvisor/utils/sysfs"
34
35 v1 "k8s.io/api/core/v1"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/fields"
38 "k8s.io/apimachinery/pkg/runtime"
39 "k8s.io/apimachinery/pkg/watch"
40 utilfeature "k8s.io/apiserver/pkg/util/feature"
41 clientset "k8s.io/client-go/kubernetes"
42 "k8s.io/client-go/tools/cache"
43 toolswatch "k8s.io/client-go/tools/watch"
44 utilsysctl "k8s.io/component-helpers/node/util/sysctl"
45 "k8s.io/klog/v2"
46 "k8s.io/kubernetes/pkg/features"
47 "k8s.io/kubernetes/pkg/proxy"
48 proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
49 "k8s.io/kubernetes/pkg/proxy/iptables"
50 "k8s.io/kubernetes/pkg/proxy/ipvs"
51 utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset"
52 utilipvs "k8s.io/kubernetes/pkg/proxy/ipvs/util"
53 proxymetrics "k8s.io/kubernetes/pkg/proxy/metrics"
54 "k8s.io/kubernetes/pkg/proxy/nftables"
55 proxyutil "k8s.io/kubernetes/pkg/proxy/util"
56 proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
57 utiliptables "k8s.io/kubernetes/pkg/util/iptables"
58 "k8s.io/utils/exec"
59 )
60
61
62
63 var timeoutForNodePodCIDR = 5 * time.Minute
64
65
66
67 func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfiguration) {
68 if config.Mode == "" {
69 o.logger.Info("Using iptables proxy")
70 config.Mode = proxyconfigapi.ProxyModeIPTables
71 }
72
73 if config.DetectLocalMode == "" {
74 o.logger.V(4).Info("Defaulting detect-local-mode", "localModeClusterCIDR", string(proxyconfigapi.LocalModeClusterCIDR))
75 config.DetectLocalMode = proxyconfigapi.LocalModeClusterCIDR
76 }
77 o.logger.V(2).Info("DetectLocalMode", "localMode", string(config.DetectLocalMode))
78 }
79
80
81
82
83 func (s *ProxyServer) platformSetup() error {
84 if s.Config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
85 s.logger.Info("Watching for node, awaiting podCIDR allocation", "hostname", s.Hostname)
86 node, err := waitForPodCIDR(s.Client, s.Hostname)
87 if err != nil {
88 return err
89 }
90 s.podCIDRs = node.Spec.PodCIDRs
91 s.logger.Info("NodeInfo", "podCIDRs", node.Spec.PodCIDRs)
92 }
93
94 err := s.setupConntrack()
95 if err != nil {
96 return err
97 }
98
99 proxymetrics.RegisterMetrics()
100 return nil
101 }
102
103
104 func isIPTablesBased(mode proxyconfigapi.ProxyMode) bool {
105 return mode == proxyconfigapi.ProxyModeIPTables || mode == proxyconfigapi.ProxyModeIPVS
106 }
107
108
109
110
111 func getIPTables(primaryFamily v1.IPFamily) ([2]utiliptables.Interface, utiliptables.Interface) {
112 execer := exec.New()
113
114
115 ipt := [2]utiliptables.Interface{
116 utiliptables.New(execer, utiliptables.ProtocolIPv4),
117 utiliptables.New(execer, utiliptables.ProtocolIPv6),
118 }
119
120 var iptInterface utiliptables.Interface
121 if primaryFamily == v1.IPv4Protocol {
122 iptInterface = ipt[0]
123 } else if primaryFamily == v1.IPv6Protocol {
124 iptInterface = ipt[1]
125 }
126
127 return ipt, iptInterface
128 }
129
130
131
132 func (s *ProxyServer) platformCheckSupported() (ipv4Supported, ipv6Supported, dualStackSupported bool, err error) {
133 if isIPTablesBased(s.Config.Mode) {
134 ipt, _ := getIPTables(v1.IPFamilyUnknown)
135 ipv4Supported = ipt[0].Present()
136 ipv6Supported = ipt[1].Present()
137
138 if !ipv4Supported && !ipv6Supported {
139 err = fmt.Errorf("iptables is not available on this host")
140 } else if !ipv4Supported {
141 s.logger.Info("No iptables support for family", "ipFamily", v1.IPv4Protocol)
142 } else if !ipv6Supported {
143 s.logger.Info("No iptables support for family", "ipFamily", v1.IPv6Protocol)
144 }
145 } else {
146
147
148 ipv4Supported, ipv6Supported = true, true
149 }
150
151
152
153 dualStackSupported = ipv4Supported && ipv6Supported
154 return
155 }
156
157
158 func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration, dualStack, initOnly bool) (proxy.Provider, error) {
159 var proxier proxy.Provider
160 var localDetectors [2]proxyutiliptables.LocalTrafficDetector
161 var localDetector proxyutiliptables.LocalTrafficDetector
162 var err error
163
164 if config.Mode == proxyconfigapi.ProxyModeIPTables {
165 s.logger.Info("Using iptables Proxier")
166
167 if dualStack {
168 ipt, _ := getIPTables(s.PrimaryIPFamily)
169
170 localDetectors, err = getDualStackLocalDetectorTuple(s.logger, config.DetectLocalMode, config, s.podCIDRs)
171 if err != nil {
172 return nil, fmt.Errorf("unable to create proxier: %v", err)
173 }
174
175
176 proxier, err = iptables.NewDualStackProxier(
177 ipt,
178 utilsysctl.New(),
179 exec.New(),
180 config.IPTables.SyncPeriod.Duration,
181 config.IPTables.MinSyncPeriod.Duration,
182 config.IPTables.MasqueradeAll,
183 *config.IPTables.LocalhostNodePorts,
184 int(*config.IPTables.MasqueradeBit),
185 localDetectors,
186 s.Hostname,
187 s.NodeIPs,
188 s.Recorder,
189 s.HealthzServer,
190 config.NodePortAddresses,
191 initOnly,
192 )
193 } else {
194
195 _, iptInterface := getIPTables(s.PrimaryIPFamily)
196 localDetector, err = getLocalDetector(s.logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
197 if err != nil {
198 return nil, fmt.Errorf("unable to create proxier: %v", err)
199 }
200
201
202 proxier, err = iptables.NewProxier(
203 s.PrimaryIPFamily,
204 iptInterface,
205 utilsysctl.New(),
206 exec.New(),
207 config.IPTables.SyncPeriod.Duration,
208 config.IPTables.MinSyncPeriod.Duration,
209 config.IPTables.MasqueradeAll,
210 *config.IPTables.LocalhostNodePorts,
211 int(*config.IPTables.MasqueradeBit),
212 localDetector,
213 s.Hostname,
214 s.NodeIPs[s.PrimaryIPFamily],
215 s.Recorder,
216 s.HealthzServer,
217 config.NodePortAddresses,
218 initOnly,
219 )
220 }
221
222 if err != nil {
223 return nil, fmt.Errorf("unable to create proxier: %v", err)
224 }
225 } else if config.Mode == proxyconfigapi.ProxyModeIPVS {
226 execer := exec.New()
227 ipsetInterface := utilipset.New(execer)
228 ipvsInterface := utilipvs.New()
229 if err := ipvs.CanUseIPVSProxier(ipvsInterface, ipsetInterface, config.IPVS.Scheduler); err != nil {
230 return nil, fmt.Errorf("can't use the IPVS proxier: %v", err)
231 }
232
233 s.logger.Info("Using ipvs Proxier")
234 if dualStack {
235 ipt, _ := getIPTables(s.PrimaryIPFamily)
236
237
238 localDetectors, err = getDualStackLocalDetectorTuple(s.logger, config.DetectLocalMode, config, s.podCIDRs)
239 if err != nil {
240 return nil, fmt.Errorf("unable to create proxier: %v", err)
241 }
242
243 proxier, err = ipvs.NewDualStackProxier(
244 ipt,
245 ipvsInterface,
246 ipsetInterface,
247 utilsysctl.New(),
248 execer,
249 config.IPVS.SyncPeriod.Duration,
250 config.IPVS.MinSyncPeriod.Duration,
251 config.IPVS.ExcludeCIDRs,
252 config.IPVS.StrictARP,
253 config.IPVS.TCPTimeout.Duration,
254 config.IPVS.TCPFinTimeout.Duration,
255 config.IPVS.UDPTimeout.Duration,
256 config.IPTables.MasqueradeAll,
257 int(*config.IPTables.MasqueradeBit),
258 localDetectors,
259 s.Hostname,
260 s.NodeIPs,
261 s.Recorder,
262 s.HealthzServer,
263 config.IPVS.Scheduler,
264 config.NodePortAddresses,
265 initOnly,
266 )
267 } else {
268 _, iptInterface := getIPTables(s.PrimaryIPFamily)
269 localDetector, err = getLocalDetector(s.logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
270 if err != nil {
271 return nil, fmt.Errorf("unable to create proxier: %v", err)
272 }
273
274 proxier, err = ipvs.NewProxier(
275 s.PrimaryIPFamily,
276 iptInterface,
277 ipvsInterface,
278 ipsetInterface,
279 utilsysctl.New(),
280 execer,
281 config.IPVS.SyncPeriod.Duration,
282 config.IPVS.MinSyncPeriod.Duration,
283 config.IPVS.ExcludeCIDRs,
284 config.IPVS.StrictARP,
285 config.IPVS.TCPTimeout.Duration,
286 config.IPVS.TCPFinTimeout.Duration,
287 config.IPVS.UDPTimeout.Duration,
288 config.IPTables.MasqueradeAll,
289 int(*config.IPTables.MasqueradeBit),
290 localDetector,
291 s.Hostname,
292 s.NodeIPs[s.PrimaryIPFamily],
293 s.Recorder,
294 s.HealthzServer,
295 config.IPVS.Scheduler,
296 config.NodePortAddresses,
297 initOnly,
298 )
299 }
300 if err != nil {
301 return nil, fmt.Errorf("unable to create proxier: %v", err)
302 }
303 } else if config.Mode == proxyconfigapi.ProxyModeNFTables {
304 s.logger.Info("Using nftables Proxier")
305
306 if dualStack {
307 localDetectors, err = getDualStackLocalDetectorTuple(s.logger, config.DetectLocalMode, config, s.podCIDRs)
308 if err != nil {
309 return nil, fmt.Errorf("unable to create proxier: %v", err)
310 }
311
312
313 proxier, err = nftables.NewDualStackProxier(
314 utilsysctl.New(),
315 config.NFTables.SyncPeriod.Duration,
316 config.NFTables.MinSyncPeriod.Duration,
317 config.NFTables.MasqueradeAll,
318 int(*config.NFTables.MasqueradeBit),
319 localDetectors,
320 s.Hostname,
321 s.NodeIPs,
322 s.Recorder,
323 s.HealthzServer,
324 config.NodePortAddresses,
325 initOnly,
326 )
327 } else {
328
329 localDetector, err = getLocalDetector(s.logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
330 if err != nil {
331 return nil, fmt.Errorf("unable to create proxier: %v", err)
332 }
333
334
335 proxier, err = nftables.NewProxier(
336 s.PrimaryIPFamily,
337 utilsysctl.New(),
338 config.NFTables.SyncPeriod.Duration,
339 config.NFTables.MinSyncPeriod.Duration,
340 config.NFTables.MasqueradeAll,
341 int(*config.NFTables.MasqueradeBit),
342 localDetector,
343 s.Hostname,
344 s.NodeIPs[s.PrimaryIPFamily],
345 s.Recorder,
346 s.HealthzServer,
347 config.NodePortAddresses,
348 initOnly,
349 )
350 }
351
352 if err != nil {
353 return nil, fmt.Errorf("unable to create proxier: %v", err)
354 }
355 }
356
357 return proxier, nil
358 }
359
360 func (s *ProxyServer) setupConntrack() error {
361 ct := &realConntracker{
362 logger: s.logger,
363 }
364
365 max, err := getConntrackMax(s.logger, s.Config.Conntrack)
366 if err != nil {
367 return err
368 }
369 if max > 0 {
370 err := ct.SetMax(max)
371 if err != nil {
372 if err != errReadOnlySysFS {
373 return err
374 }
375
376
377
378
379
380
381
382 const message = "CRI error: /sys is read-only: " +
383 "cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
384 s.Recorder.Eventf(s.NodeRef, nil, v1.EventTypeWarning, err.Error(), "StartKubeProxy", message)
385 }
386 }
387
388 if s.Config.Conntrack.TCPEstablishedTimeout != nil && s.Config.Conntrack.TCPEstablishedTimeout.Duration > 0 {
389 timeout := int(s.Config.Conntrack.TCPEstablishedTimeout.Duration / time.Second)
390 if err := ct.SetTCPEstablishedTimeout(timeout); err != nil {
391 return err
392 }
393 }
394
395 if s.Config.Conntrack.TCPCloseWaitTimeout != nil && s.Config.Conntrack.TCPCloseWaitTimeout.Duration > 0 {
396 timeout := int(s.Config.Conntrack.TCPCloseWaitTimeout.Duration / time.Second)
397 if err := ct.SetTCPCloseWaitTimeout(timeout); err != nil {
398 return err
399 }
400 }
401
402 if s.Config.Conntrack.TCPBeLiberal {
403 if err := ct.SetTCPBeLiberal(1); err != nil {
404 return err
405 }
406 }
407
408 if s.Config.Conntrack.UDPTimeout.Duration > 0 {
409 timeout := int(s.Config.Conntrack.UDPTimeout.Duration / time.Second)
410 if err := ct.SetUDPTimeout(timeout); err != nil {
411 return err
412 }
413 }
414
415 if s.Config.Conntrack.UDPStreamTimeout.Duration > 0 {
416 timeout := int(s.Config.Conntrack.UDPStreamTimeout.Duration / time.Second)
417 if err := ct.SetUDPStreamTimeout(timeout); err != nil {
418 return err
419 }
420 }
421
422 return nil
423 }
424
425 func getConntrackMax(logger klog.Logger, config proxyconfigapi.KubeProxyConntrackConfiguration) (int, error) {
426 if config.MaxPerCore != nil && *config.MaxPerCore > 0 {
427 floor := 0
428 if config.Min != nil {
429 floor = int(*config.Min)
430 }
431 scaled := int(*config.MaxPerCore) * detectNumCPU()
432 if scaled > floor {
433 logger.V(3).Info("GetConntrackMax: using scaled conntrack-max-per-core")
434 return scaled, nil
435 }
436 logger.V(3).Info("GetConntrackMax: using conntrack-min")
437 return floor, nil
438 }
439 return 0, nil
440 }
441
442 func waitForPodCIDR(client clientset.Interface, nodeName string) (*v1.Node, error) {
443
444
445 ctx, cancelFunc := context.WithTimeout(context.TODO(), timeoutForNodePodCIDR)
446 defer cancelFunc()
447
448 fieldSelector := fields.OneTermEqualSelector("metadata.name", nodeName).String()
449 lw := &cache.ListWatch{
450 ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
451 options.FieldSelector = fieldSelector
452 return client.CoreV1().Nodes().List(ctx, options)
453 },
454 WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
455 options.FieldSelector = fieldSelector
456 return client.CoreV1().Nodes().Watch(ctx, options)
457 },
458 }
459 condition := func(event watch.Event) (bool, error) {
460
461 if event.Type != watch.Modified && event.Type != watch.Added {
462 return false, nil
463 }
464
465 n, ok := event.Object.(*v1.Node)
466 if !ok {
467 return false, fmt.Errorf("event object not of type Node")
468 }
469
470 if !n.DeletionTimestamp.IsZero() {
471 return false, nil
472 }
473 return n.Spec.PodCIDR != "" && len(n.Spec.PodCIDRs) > 0, nil
474 }
475
476 evt, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition)
477 if err != nil {
478 return nil, fmt.Errorf("timeout waiting for PodCIDR allocation to configure detect-local-mode %v: %v", proxyconfigapi.LocalModeNodeCIDR, err)
479 }
480 if n, ok := evt.Object.(*v1.Node); ok {
481 return n, nil
482 }
483 return nil, fmt.Errorf("event object not of type node")
484 }
485
486 func detectNumCPU() int {
487
488 _, numCPU, err := machine.GetTopology(sysfs.NewRealSysFs())
489 if err != nil || numCPU < 1 {
490 return goruntime.NumCPU()
491 }
492 return numCPU
493 }
494
495 func getLocalDetector(logger klog.Logger, ipFamily v1.IPFamily, mode proxyconfigapi.LocalMode, config *proxyconfigapi.KubeProxyConfiguration, nodePodCIDRs []string) (proxyutiliptables.LocalTrafficDetector, error) {
496 switch mode {
497 case proxyconfigapi.LocalModeClusterCIDR:
498
499
500 clusterCIDRs := strings.TrimSpace(config.ClusterCIDR)
501 if len(clusterCIDRs) == 0 {
502 logger.Info("Detect-local-mode set to ClusterCIDR, but no cluster CIDR defined")
503 break
504 }
505
506 cidrsByFamily := proxyutil.MapCIDRsByIPFamily(strings.Split(clusterCIDRs, ","))
507 if len(cidrsByFamily[ipFamily]) != 0 {
508 return proxyutiliptables.NewDetectLocalByCIDR(cidrsByFamily[ipFamily][0].String())
509 }
510
511 logger.Info("Detect-local-mode set to ClusterCIDR, but no cluster CIDR for family", "ipFamily", ipFamily)
512
513 case proxyconfigapi.LocalModeNodeCIDR:
514 cidrsByFamily := proxyutil.MapCIDRsByIPFamily(nodePodCIDRs)
515 if len(cidrsByFamily[ipFamily]) != 0 {
516 return proxyutiliptables.NewDetectLocalByCIDR(cidrsByFamily[ipFamily][0].String())
517 }
518
519 logger.Info("Detect-local-mode set to NodeCIDR, but no PodCIDR defined at node for family", "ipFamily", ipFamily)
520
521 case proxyconfigapi.LocalModeBridgeInterface:
522 return proxyutiliptables.NewDetectLocalByBridgeInterface(config.DetectLocal.BridgeInterface)
523
524 case proxyconfigapi.LocalModeInterfaceNamePrefix:
525 return proxyutiliptables.NewDetectLocalByInterfaceNamePrefix(config.DetectLocal.InterfaceNamePrefix)
526 }
527
528 logger.Info("Defaulting to no-op detect-local")
529 return proxyutiliptables.NewNoOpLocalDetector(), nil
530 }
531
532 func getDualStackLocalDetectorTuple(logger klog.Logger, mode proxyconfigapi.LocalMode, config *proxyconfigapi.KubeProxyConfiguration, nodePodCIDRs []string) ([2]proxyutiliptables.LocalTrafficDetector, error) {
533 var localDetectors [2]proxyutiliptables.LocalTrafficDetector
534 var err error
535
536 localDetectors[0], err = getLocalDetector(logger, v1.IPv4Protocol, mode, config, nodePodCIDRs)
537 if err != nil {
538 return localDetectors, err
539 }
540 localDetectors[1], err = getLocalDetector(logger, v1.IPv6Protocol, mode, config, nodePodCIDRs)
541 if err != nil {
542 return localDetectors, err
543 }
544 return localDetectors, nil
545 }
546
547
548
549
550
551 func platformCleanup(mode proxyconfigapi.ProxyMode, cleanupAndExit bool) error {
552 var encounteredError bool
553
554
555 if !isIPTablesBased(mode) || cleanupAndExit {
556 ipts, _ := getIPTables(v1.IPFamilyUnknown)
557 execer := exec.New()
558 ipsetInterface := utilipset.New(execer)
559 ipvsInterface := utilipvs.New()
560
561 for _, ipt := range ipts {
562 encounteredError = iptables.CleanupLeftovers(ipt) || encounteredError
563 encounteredError = ipvs.CleanupLeftovers(ipvsInterface, ipt, ipsetInterface) || encounteredError
564 }
565 }
566
567 if utilfeature.DefaultFeatureGate.Enabled(features.NFTablesProxyMode) {
568
569 if isIPTablesBased(mode) || cleanupAndExit {
570 encounteredError = nftables.CleanupLeftovers() || encounteredError
571 }
572 }
573
574 if encounteredError {
575 return errors.New("encountered an error while tearing down rules")
576 }
577 return nil
578 }
579
View as plain text