1
2
3
4
19
20 package iptables
21
22
23
24
25
26 import (
27 "bytes"
28 "crypto/sha256"
29 "encoding/base32"
30 "fmt"
31 "net"
32 "reflect"
33 "strconv"
34 "strings"
35 "sync"
36 "sync/atomic"
37 "time"
38
39 v1 "k8s.io/api/core/v1"
40 discovery "k8s.io/api/discovery/v1"
41 "k8s.io/apimachinery/pkg/types"
42 "k8s.io/apimachinery/pkg/util/sets"
43 "k8s.io/apimachinery/pkg/util/wait"
44 "k8s.io/client-go/tools/events"
45 utilsysctl "k8s.io/component-helpers/node/util/sysctl"
46 "k8s.io/klog/v2"
47 "k8s.io/kubernetes/pkg/proxy"
48 "k8s.io/kubernetes/pkg/proxy/conntrack"
49 "k8s.io/kubernetes/pkg/proxy/healthcheck"
50 "k8s.io/kubernetes/pkg/proxy/metaproxier"
51 "k8s.io/kubernetes/pkg/proxy/metrics"
52 proxyutil "k8s.io/kubernetes/pkg/proxy/util"
53 proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
54 "k8s.io/kubernetes/pkg/util/async"
55 utiliptables "k8s.io/kubernetes/pkg/util/iptables"
56 utilexec "k8s.io/utils/exec"
57 )
58
59 const (
60
61 kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
62
63
64 kubeExternalServicesChain utiliptables.Chain = "KUBE-EXTERNAL-SERVICES"
65
66
67 kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
68
69
70 kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
71
72
73 kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
74
75
76 kubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
77
78
79 kubeProxyFirewallChain utiliptables.Chain = "KUBE-PROXY-FIREWALL"
80
81
82 kubeProxyCanaryChain utiliptables.Chain = "KUBE-PROXY-CANARY"
83
84
85
86
87 kubeletFirewallChain utiliptables.Chain = "KUBE-FIREWALL"
88
89
90
91
92 largeClusterEndpointsThreshold = 1000
93 )
94
95 const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
96 const sysctlNFConntrackTCPBeLiberal = "net/netfilter/nf_conntrack_tcp_be_liberal"
97
98
99 func NewDualStackProxier(
100 ipt [2]utiliptables.Interface,
101 sysctl utilsysctl.Interface,
102 exec utilexec.Interface,
103 syncPeriod time.Duration,
104 minSyncPeriod time.Duration,
105 masqueradeAll bool,
106 localhostNodePorts bool,
107 masqueradeBit int,
108 localDetectors [2]proxyutiliptables.LocalTrafficDetector,
109 hostname string,
110 nodeIPs map[v1.IPFamily]net.IP,
111 recorder events.EventRecorder,
112 healthzServer *healthcheck.ProxierHealthServer,
113 nodePortAddresses []string,
114 initOnly bool,
115 ) (proxy.Provider, error) {
116
117 ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], sysctl,
118 exec, syncPeriod, minSyncPeriod, masqueradeAll, localhostNodePorts, masqueradeBit, localDetectors[0], hostname,
119 nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
120 if err != nil {
121 return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
122 }
123
124 ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], sysctl,
125 exec, syncPeriod, minSyncPeriod, masqueradeAll, false, masqueradeBit, localDetectors[1], hostname,
126 nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
127 if err != nil {
128 return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
129 }
130 if initOnly {
131 return nil, nil
132 }
133 return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
134 }
135
136
137
138 type Proxier struct {
139
140 ipFamily v1.IPFamily
141
142
143
144
145
146 endpointsChanges *proxy.EndpointsChangeTracker
147 serviceChanges *proxy.ServiceChangeTracker
148
149 mu sync.Mutex
150 svcPortMap proxy.ServicePortMap
151 endpointsMap proxy.EndpointsMap
152 nodeLabels map[string]string
153
154
155
156 endpointSlicesSynced bool
157 servicesSynced bool
158 needFullSync bool
159 initialized int32
160 syncRunner *async.BoundedFrequencyRunner
161 syncPeriod time.Duration
162 lastIPTablesCleanup time.Time
163
164
165 iptables utiliptables.Interface
166 masqueradeAll bool
167 masqueradeMark string
168 conntrack conntrack.Interface
169 localDetector proxyutiliptables.LocalTrafficDetector
170 hostname string
171 nodeIP net.IP
172 recorder events.EventRecorder
173
174 serviceHealthServer healthcheck.ServiceHealthServer
175 healthzServer *healthcheck.ProxierHealthServer
176
177
178
179
180 precomputedProbabilities []string
181
182
183
184 iptablesData *bytes.Buffer
185 existingFilterChainsData *bytes.Buffer
186 filterChains proxyutil.LineBuffer
187 filterRules proxyutil.LineBuffer
188 natChains proxyutil.LineBuffer
189 natRules proxyutil.LineBuffer
190
191
192
193
194 largeClusterMode bool
195
196
197
198 localhostNodePorts bool
199
200
201 conntrackTCPLiberal bool
202
203
204 nodePortAddresses *proxyutil.NodePortAddresses
205
206
207 networkInterfacer proxyutil.NetworkInterfacer
208 }
209
210
211 var _ proxy.Provider = &Proxier{}
212
213
214
215
216
217
218 func NewProxier(ipFamily v1.IPFamily,
219 ipt utiliptables.Interface,
220 sysctl utilsysctl.Interface,
221 exec utilexec.Interface,
222 syncPeriod time.Duration,
223 minSyncPeriod time.Duration,
224 masqueradeAll bool,
225 localhostNodePorts bool,
226 masqueradeBit int,
227 localDetector proxyutiliptables.LocalTrafficDetector,
228 hostname string,
229 nodeIP net.IP,
230 recorder events.EventRecorder,
231 healthzServer *healthcheck.ProxierHealthServer,
232 nodePortAddressStrings []string,
233 initOnly bool,
234 ) (*Proxier, error) {
235 nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings, nil)
236
237 if !nodePortAddresses.ContainsIPv4Loopback() {
238 localhostNodePorts = false
239 }
240 if localhostNodePorts {
241
242
243 klog.InfoS("Setting route_localnet=1 to allow node-ports on localhost; to change this either disable iptables.localhostNodePorts (--iptables-localhost-nodeports) or set nodePortAddresses (--nodeport-addresses) to filter loopback addresses")
244 if err := proxyutil.EnsureSysctl(sysctl, sysctlRouteLocalnet, 1); err != nil {
245 return nil, err
246 }
247 }
248
249
250
251
252 conntrackTCPLiberal := false
253 if val, err := sysctl.GetSysctl(sysctlNFConntrackTCPBeLiberal); err == nil && val != 0 {
254 conntrackTCPLiberal = true
255 klog.InfoS("nf_conntrack_tcp_be_liberal set, not installing DROP rules for INVALID packets")
256 }
257
258 if initOnly {
259 klog.InfoS("System initialized and --init-only specified")
260 return nil, nil
261 }
262
263
264 masqueradeValue := 1 << uint(masqueradeBit)
265 masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
266 klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)
267
268 serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
269
270 proxier := &Proxier{
271 ipFamily: ipFamily,
272 svcPortMap: make(proxy.ServicePortMap),
273 serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
274 endpointsMap: make(proxy.EndpointsMap),
275 endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
276 needFullSync: true,
277 syncPeriod: syncPeriod,
278 iptables: ipt,
279 masqueradeAll: masqueradeAll,
280 masqueradeMark: masqueradeMark,
281 conntrack: conntrack.NewExec(exec),
282 localDetector: localDetector,
283 hostname: hostname,
284 nodeIP: nodeIP,
285 recorder: recorder,
286 serviceHealthServer: serviceHealthServer,
287 healthzServer: healthzServer,
288 precomputedProbabilities: make([]string, 0, 1001),
289 iptablesData: bytes.NewBuffer(nil),
290 existingFilterChainsData: bytes.NewBuffer(nil),
291 filterChains: proxyutil.NewLineBuffer(),
292 filterRules: proxyutil.NewLineBuffer(),
293 natChains: proxyutil.NewLineBuffer(),
294 natRules: proxyutil.NewLineBuffer(),
295 localhostNodePorts: localhostNodePorts,
296 nodePortAddresses: nodePortAddresses,
297 networkInterfacer: proxyutil.RealNetwork{},
298 conntrackTCPLiberal: conntrackTCPLiberal,
299 }
300
301 burstSyncs := 2
302 klog.V(2).InfoS("Iptables sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
303
304
305
306 proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
307
308 go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
309 proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop)
310
311 if ipt.HasRandomFully() {
312 klog.V(2).InfoS("Iptables supports --random-fully", "ipFamily", ipt.Protocol())
313 } else {
314 klog.V(2).InfoS("Iptables does not support --random-fully", "ipFamily", ipt.Protocol())
315 }
316
317 return proxier, nil
318 }
319
320
321 type servicePortInfo struct {
322 *proxy.BaseServicePortInfo
323
324 nameString string
325 clusterPolicyChainName utiliptables.Chain
326 localPolicyChainName utiliptables.Chain
327 firewallChainName utiliptables.Chain
328 externalChainName utiliptables.Chain
329 }
330
331
332 func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
333 svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo}
334
335
336 svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
337 svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
338 protocol := strings.ToLower(string(svcPort.Protocol()))
339 svcPort.nameString = svcPortName.String()
340 svcPort.clusterPolicyChainName = servicePortPolicyClusterChain(svcPort.nameString, protocol)
341 svcPort.localPolicyChainName = servicePortPolicyLocalChainName(svcPort.nameString, protocol)
342 svcPort.firewallChainName = serviceFirewallChainName(svcPort.nameString, protocol)
343 svcPort.externalChainName = serviceExternalChainName(svcPort.nameString, protocol)
344
345 return svcPort
346 }
347
348
349 type endpointInfo struct {
350 *proxy.BaseEndpointInfo
351
352 ChainName utiliptables.Chain
353 }
354
355
356 func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
357 return &endpointInfo{
358 BaseEndpointInfo: baseInfo,
359 ChainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()),
360 }
361 }
362
363 type iptablesJumpChain struct {
364 table utiliptables.Table
365 dstChain utiliptables.Chain
366 srcChain utiliptables.Chain
367 comment string
368 extraArgs []string
369 }
370
371 var iptablesJumpChains = []iptablesJumpChain{
372 {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
373 {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainForward, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
374 {utiliptables.TableFilter, kubeNodePortsChain, utiliptables.ChainInput, "kubernetes health check service ports", nil},
375 {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
376 {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
377 {utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil},
378 {utiliptables.TableFilter, kubeProxyFirewallChain, utiliptables.ChainInput, "kubernetes load balancer firewall", []string{"-m", "conntrack", "--ctstate", "NEW"}},
379 {utiliptables.TableFilter, kubeProxyFirewallChain, utiliptables.ChainOutput, "kubernetes load balancer firewall", []string{"-m", "conntrack", "--ctstate", "NEW"}},
380 {utiliptables.TableFilter, kubeProxyFirewallChain, utiliptables.ChainForward, "kubernetes load balancer firewall", []string{"-m", "conntrack", "--ctstate", "NEW"}},
381 {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil},
382 {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil},
383 {utiliptables.TableNAT, kubePostroutingChain, utiliptables.ChainPostrouting, "kubernetes postrouting rules", nil},
384 }
385
386
387
388 var iptablesKubeletJumpChains = []iptablesJumpChain{
389 {utiliptables.TableFilter, kubeletFirewallChain, utiliptables.ChainInput, "", nil},
390 {utiliptables.TableFilter, kubeletFirewallChain, utiliptables.ChainOutput, "", nil},
391 }
392
393
394
395 var iptablesCleanupOnlyChains = []iptablesJumpChain{}
396
397
398
399 func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
400
401 for _, jump := range append(iptablesJumpChains, iptablesCleanupOnlyChains...) {
402 args := append(jump.extraArgs,
403 "-m", "comment", "--comment", jump.comment,
404 "-j", string(jump.dstChain),
405 )
406 if err := ipt.DeleteRule(jump.table, jump.srcChain, args...); err != nil {
407 if !utiliptables.IsNotFoundError(err) {
408 klog.ErrorS(err, "Error removing pure-iptables proxy rule")
409 encounteredError = true
410 }
411 }
412 }
413
414
415 iptablesData := bytes.NewBuffer(nil)
416 if err := ipt.SaveInto(utiliptables.TableNAT, iptablesData); err != nil {
417 klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableNAT)
418 encounteredError = true
419 } else {
420 existingNATChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
421 natChains := proxyutil.NewLineBuffer()
422 natRules := proxyutil.NewLineBuffer()
423 natChains.Write("*nat")
424
425 for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} {
426 if existingNATChains.Has(chain) {
427 chainString := string(chain)
428 natChains.Write(utiliptables.MakeChainLine(chain))
429 natRules.Write("-X", chainString)
430 }
431 }
432
433 for chain := range existingNATChains {
434 chainString := string(chain)
435 if isServiceChainName(chainString) {
436 natChains.Write(utiliptables.MakeChainLine(chain))
437 natRules.Write("-X", chainString)
438 }
439 }
440 natRules.Write("COMMIT")
441 natLines := append(natChains.Bytes(), natRules.Bytes()...)
442
443 err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
444 if err != nil {
445 klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableNAT)
446 metrics.IptablesRestoreFailuresTotal.Inc()
447 encounteredError = true
448 }
449 }
450
451
452 iptablesData.Reset()
453 if err := ipt.SaveInto(utiliptables.TableFilter, iptablesData); err != nil {
454 klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableFilter)
455 encounteredError = true
456 } else {
457 existingFilterChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
458 filterChains := proxyutil.NewLineBuffer()
459 filterRules := proxyutil.NewLineBuffer()
460 filterChains.Write("*filter")
461 for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
462 if existingFilterChains.Has(chain) {
463 chainString := string(chain)
464 filterChains.Write(utiliptables.MakeChainLine(chain))
465 filterRules.Write("-X", chainString)
466 }
467 }
468 filterRules.Write("COMMIT")
469 filterLines := append(filterChains.Bytes(), filterRules.Bytes()...)
470
471 if err := ipt.Restore(utiliptables.TableFilter, filterLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil {
472 klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableFilter)
473 metrics.IptablesRestoreFailuresTotal.Inc()
474 encounteredError = true
475 }
476 }
477 return encounteredError
478 }
479
480 func computeProbability(n int) string {
481 return fmt.Sprintf("%0.10f", 1.0/float64(n))
482 }
483
484
485 func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) {
486 if len(proxier.precomputedProbabilities) == 0 {
487 proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "<bad value>")
488 }
489 for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ {
490 proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i))
491 }
492 }
493
494
495 func (proxier *Proxier) probability(n int) string {
496 if n >= len(proxier.precomputedProbabilities) {
497 proxier.precomputeProbabilities(n)
498 }
499 return proxier.precomputedProbabilities[n]
500 }
501
502
503 func (proxier *Proxier) Sync() {
504 if proxier.healthzServer != nil {
505 proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
506 }
507 metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
508 proxier.syncRunner.Run()
509 }
510
511
512 func (proxier *Proxier) SyncLoop() {
513
514 if proxier.healthzServer != nil {
515 proxier.healthzServer.Updated(proxier.ipFamily)
516 }
517
518
519 metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
520 proxier.syncRunner.Loop(wait.NeverStop)
521 }
522
523 func (proxier *Proxier) setInitialized(value bool) {
524 var initialized int32
525 if value {
526 initialized = 1
527 }
528 atomic.StoreInt32(&proxier.initialized, initialized)
529 }
530
531 func (proxier *Proxier) isInitialized() bool {
532 return atomic.LoadInt32(&proxier.initialized) > 0
533 }
534
535
536
537 func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
538 proxier.OnServiceUpdate(nil, service)
539 }
540
541
542
543 func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
544 if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
545 proxier.Sync()
546 }
547 }
548
549
550
551 func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
552 proxier.OnServiceUpdate(service, nil)
553
554 }
555
556
557
558 func (proxier *Proxier) OnServiceSynced() {
559 proxier.mu.Lock()
560 proxier.servicesSynced = true
561 proxier.setInitialized(proxier.endpointSlicesSynced)
562 proxier.mu.Unlock()
563
564
565 proxier.syncProxyRules()
566 }
567
568
569
570 func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
571 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
572 proxier.Sync()
573 }
574 }
575
576
577
578 func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
579 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
580 proxier.Sync()
581 }
582 }
583
584
585
586 func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
587 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
588 proxier.Sync()
589 }
590 }
591
592
593
594 func (proxier *Proxier) OnEndpointSlicesSynced() {
595 proxier.mu.Lock()
596 proxier.endpointSlicesSynced = true
597 proxier.setInitialized(proxier.servicesSynced)
598 proxier.mu.Unlock()
599
600
601 proxier.syncProxyRules()
602 }
603
604
605
606 func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
607 if node.Name != proxier.hostname {
608 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
609 "eventNode", node.Name, "currentNode", proxier.hostname)
610 return
611 }
612
613 if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
614 return
615 }
616
617 proxier.mu.Lock()
618 proxier.nodeLabels = map[string]string{}
619 for k, v := range node.Labels {
620 proxier.nodeLabels[k] = v
621 }
622 proxier.needFullSync = true
623 proxier.mu.Unlock()
624 klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
625
626 proxier.Sync()
627 }
628
629
630
631 func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
632 if node.Name != proxier.hostname {
633 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
634 "eventNode", node.Name, "currentNode", proxier.hostname)
635 return
636 }
637
638 if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
639 return
640 }
641
642 proxier.mu.Lock()
643 proxier.nodeLabels = map[string]string{}
644 for k, v := range node.Labels {
645 proxier.nodeLabels[k] = v
646 }
647 proxier.needFullSync = true
648 proxier.mu.Unlock()
649 klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
650
651 proxier.Sync()
652 }
653
654
655
656 func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
657 if node.Name != proxier.hostname {
658 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
659 "eventNode", node.Name, "currentNode", proxier.hostname)
660 return
661 }
662
663 proxier.mu.Lock()
664 proxier.nodeLabels = nil
665 proxier.needFullSync = true
666 proxier.mu.Unlock()
667
668 proxier.Sync()
669 }
670
671
672
673 func (proxier *Proxier) OnNodeSynced() {
674 }
675
676
677
678 func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
679
680
681
682
683
684 func portProtoHash(servicePortName string, protocol string) string {
685 hash := sha256.Sum256([]byte(servicePortName + protocol))
686 encoded := base32.StdEncoding.EncodeToString(hash[:])
687 return encoded[:16]
688 }
689
690 const (
691 servicePortPolicyClusterChainNamePrefix = "KUBE-SVC-"
692 servicePortPolicyLocalChainNamePrefix = "KUBE-SVL-"
693 serviceFirewallChainNamePrefix = "KUBE-FW-"
694 serviceExternalChainNamePrefix = "KUBE-EXT-"
695 servicePortEndpointChainNamePrefix = "KUBE-SEP-"
696 )
697
698
699
700
701 func servicePortPolicyClusterChain(servicePortName string, protocol string) utiliptables.Chain {
702 return utiliptables.Chain(servicePortPolicyClusterChainNamePrefix + portProtoHash(servicePortName, protocol))
703 }
704
705
706
707
708 func servicePortPolicyLocalChainName(servicePortName string, protocol string) utiliptables.Chain {
709 return utiliptables.Chain(servicePortPolicyLocalChainNamePrefix + portProtoHash(servicePortName, protocol))
710 }
711
712
713
714 func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain {
715 return utiliptables.Chain(serviceFirewallChainNamePrefix + portProtoHash(servicePortName, protocol))
716 }
717
718
719
720
721
722 func serviceExternalChainName(servicePortName string, protocol string) utiliptables.Chain {
723 return utiliptables.Chain(serviceExternalChainNamePrefix + portProtoHash(servicePortName, protocol))
724 }
725
726
727
728 func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) utiliptables.Chain {
729 hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint))
730 encoded := base32.StdEncoding.EncodeToString(hash[:])
731 return utiliptables.Chain(servicePortEndpointChainNamePrefix + encoded[:16])
732 }
733
734 func isServiceChainName(chainString string) bool {
735 prefixes := []string{
736 servicePortPolicyClusterChainNamePrefix,
737 servicePortPolicyLocalChainNamePrefix,
738 servicePortEndpointChainNamePrefix,
739 serviceFirewallChainNamePrefix,
740 serviceExternalChainNamePrefix,
741 }
742
743 for _, p := range prefixes {
744 if strings.HasPrefix(chainString, p) {
745 return true
746 }
747 }
748 return false
749 }
750
751
752 func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string) []string {
753
754
755
756 if proxier.largeClusterMode {
757 return args
758 }
759 return append(args, "-m", "comment", "--comment", svcName)
760 }
761
762
763
764
765 func (proxier *Proxier) forceSyncProxyRules() {
766 proxier.mu.Lock()
767 proxier.needFullSync = true
768 proxier.mu.Unlock()
769
770 proxier.syncProxyRules()
771 }
772
773
774
775
776 func (proxier *Proxier) syncProxyRules() {
777 proxier.mu.Lock()
778 defer proxier.mu.Unlock()
779
780
781 if !proxier.isInitialized() {
782 klog.V(2).InfoS("Not syncing iptables until Services and Endpoints have been received from master")
783 return
784 }
785
786
787
788 tryPartialSync := !proxier.needFullSync
789
790
791 start := time.Now()
792 defer func() {
793 metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
794 if tryPartialSync {
795 metrics.SyncPartialProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
796 } else {
797 metrics.SyncFullProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
798 }
799 klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
800 }()
801
802 serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
803 endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
804
805 klog.V(2).InfoS("Syncing iptables rules")
806
807 success := false
808 defer func() {
809 if !success {
810 klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
811 proxier.syncRunner.RetryAfter(proxier.syncPeriod)
812 if tryPartialSync {
813 metrics.IptablesPartialRestoreFailuresTotal.Inc()
814 }
815
816
817
818 proxier.needFullSync = true
819 }
820 }()
821
822 if !tryPartialSync {
823
824
825
826
827
828
829
830
831
832
833
834 for _, jump := range append(iptablesJumpChains, iptablesKubeletJumpChains...) {
835 if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
836 klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
837 return
838 }
839 args := jump.extraArgs
840 if jump.comment != "" {
841 args = append(args, "-m", "comment", "--comment", jump.comment)
842 }
843 args = append(args, "-j", string(jump.dstChain))
844 if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
845 klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
846 return
847 }
848 }
849 }
850
851
852
853
854
855
856
857 proxier.filterChains.Reset()
858 proxier.filterRules.Reset()
859 proxier.natChains.Reset()
860 proxier.natRules.Reset()
861
862 skippedNatChains := proxyutil.NewDiscardLineBuffer()
863 skippedNatRules := proxyutil.NewDiscardLineBuffer()
864
865
866 for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain, kubeProxyFirewallChain} {
867 proxier.filterChains.Write(utiliptables.MakeChainLine(chainName))
868 }
869 for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, kubeMarkMasqChain} {
870 proxier.natChains.Write(utiliptables.MakeChainLine(chainName))
871 }
872
873
874
875
876
877 proxier.natRules.Write(
878 "-A", string(kubePostroutingChain),
879 "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
880 "-j", "RETURN",
881 )
882
883 proxier.natRules.Write(
884 "-A", string(kubePostroutingChain),
885 "-j", "MARK", "--xor-mark", proxier.masqueradeMark,
886 )
887 masqRule := []string{
888 "-A", string(kubePostroutingChain),
889 "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
890 "-j", "MASQUERADE",
891 }
892 if proxier.iptables.HasRandomFully() {
893 masqRule = append(masqRule, "--random-fully")
894 }
895 proxier.natRules.Write(masqRule)
896
897
898
899
900 proxier.natRules.Write(
901 "-A", string(kubeMarkMasqChain),
902 "-j", "MARK", "--or-mark", proxier.masqueradeMark,
903 )
904
905 isIPv6 := proxier.iptables.IsIPv6()
906 if !isIPv6 && proxier.localhostNodePorts {
907
908
909
910
911
912
913
914
915
916
917 proxier.filterChains.Write(utiliptables.MakeChainLine(kubeletFirewallChain))
918 proxier.filterRules.Write(
919 "-A", string(kubeletFirewallChain),
920 "-m", "comment", "--comment", `"block incoming localnet connections"`,
921 "-d", "127.0.0.0/8",
922 "!", "-s", "127.0.0.0/8",
923 "-m", "conntrack",
924 "!", "--ctstate", "RELATED,ESTABLISHED,DNAT",
925 "-j", "DROP",
926 )
927 }
928
929
930 activeNATChains := sets.New[utiliptables.Chain]()
931
932
933
934
935
936 args := make([]string, 64)
937
938
939
940 totalEndpoints := 0
941 for svcName := range proxier.svcPortMap {
942 totalEndpoints += len(proxier.endpointsMap[svcName])
943 }
944 proxier.largeClusterMode = (totalEndpoints > largeClusterEndpointsThreshold)
945
946
947
948 serviceNoLocalEndpointsTotalInternal := 0
949 serviceNoLocalEndpointsTotalExternal := 0
950
951
952 for svcName, svc := range proxier.svcPortMap {
953 svcInfo, ok := svc.(*servicePortInfo)
954 if !ok {
955 klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
956 continue
957 }
958 protocol := strings.ToLower(string(svcInfo.Protocol()))
959 svcPortNameString := svcInfo.nameString
960
961
962
963
964
965 allEndpoints := proxier.endpointsMap[svcName]
966 clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
967
968
969 clusterPolicyChain := svcInfo.clusterPolicyChainName
970 usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
971
972
973 localPolicyChain := svcInfo.localPolicyChainName
974 usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()
975
976
977
978
979
980
981
982 internalPolicyChain := clusterPolicyChain
983 hasInternalEndpoints := hasEndpoints
984 if svcInfo.InternalPolicyLocal() {
985 internalPolicyChain = localPolicyChain
986 if len(localEndpoints) == 0 {
987 hasInternalEndpoints = false
988 }
989 }
990 internalTrafficChain := internalPolicyChain
991
992
993
994
995
996
997
998
999
1000 externalPolicyChain := clusterPolicyChain
1001 hasExternalEndpoints := hasEndpoints
1002 if svcInfo.ExternalPolicyLocal() {
1003 externalPolicyChain = localPolicyChain
1004 if len(localEndpoints) == 0 {
1005 hasExternalEndpoints = false
1006 }
1007 }
1008 externalTrafficChain := svcInfo.externalChainName
1009
1010
1011
1012
1013 usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()
1014
1015
1016
1017
1018 loadBalancerTrafficChain := externalTrafficChain
1019 fwChain := svcInfo.firewallChainName
1020 usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPs()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
1021 if usesFWChain {
1022 loadBalancerTrafficChain = fwChain
1023 }
1024
1025 var internalTrafficFilterTarget, internalTrafficFilterComment string
1026 var externalTrafficFilterTarget, externalTrafficFilterComment string
1027 if !hasEndpoints {
1028
1029
1030
1031
1032
1033 internalTrafficFilterTarget = "REJECT"
1034 internalTrafficFilterComment = fmt.Sprintf(`"%s has no endpoints"`, svcPortNameString)
1035 externalTrafficFilterTarget = "REJECT"
1036 externalTrafficFilterComment = internalTrafficFilterComment
1037 } else {
1038 if !hasInternalEndpoints {
1039
1040
1041
1042 internalTrafficFilterTarget = "DROP"
1043 internalTrafficFilterComment = fmt.Sprintf(`"%s has no local endpoints"`, svcPortNameString)
1044 serviceNoLocalEndpointsTotalInternal++
1045 }
1046 if !hasExternalEndpoints {
1047
1048
1049
1050
1051 externalTrafficFilterTarget = "DROP"
1052 externalTrafficFilterComment = fmt.Sprintf(`"%s has no local endpoints"`, svcPortNameString)
1053 serviceNoLocalEndpointsTotalExternal++
1054 }
1055 }
1056
1057 filterRules := proxier.filterRules
1058 natChains := proxier.natChains
1059 natRules := proxier.natRules
1060
1061
1062 if hasInternalEndpoints {
1063 natRules.Write(
1064 "-A", string(kubeServicesChain),
1065 "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
1066 "-m", protocol, "-p", protocol,
1067 "-d", svcInfo.ClusterIP().String(),
1068 "--dport", strconv.Itoa(svcInfo.Port()),
1069 "-j", string(internalTrafficChain))
1070 } else {
1071
1072 filterRules.Write(
1073 "-A", string(kubeServicesChain),
1074 "-m", "comment", "--comment", internalTrafficFilterComment,
1075 "-m", protocol, "-p", protocol,
1076 "-d", svcInfo.ClusterIP().String(),
1077 "--dport", strconv.Itoa(svcInfo.Port()),
1078 "-j", internalTrafficFilterTarget,
1079 )
1080 }
1081
1082
1083 for _, externalIP := range svcInfo.ExternalIPs() {
1084 if hasEndpoints {
1085
1086
1087 natRules.Write(
1088 "-A", string(kubeServicesChain),
1089 "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcPortNameString),
1090 "-m", protocol, "-p", protocol,
1091 "-d", externalIP.String(),
1092 "--dport", strconv.Itoa(svcInfo.Port()),
1093 "-j", string(externalTrafficChain))
1094 }
1095 if !hasExternalEndpoints {
1096
1097
1098
1099 filterRules.Write(
1100 "-A", string(kubeExternalServicesChain),
1101 "-m", "comment", "--comment", externalTrafficFilterComment,
1102 "-m", protocol, "-p", protocol,
1103 "-d", externalIP.String(),
1104 "--dport", strconv.Itoa(svcInfo.Port()),
1105 "-j", externalTrafficFilterTarget,
1106 )
1107 }
1108 }
1109
1110
1111 for _, lbip := range svcInfo.LoadBalancerVIPs() {
1112 if hasEndpoints {
1113 natRules.Write(
1114 "-A", string(kubeServicesChain),
1115 "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
1116 "-m", protocol, "-p", protocol,
1117 "-d", lbip.String(),
1118 "--dport", strconv.Itoa(svcInfo.Port()),
1119 "-j", string(loadBalancerTrafficChain))
1120
1121 }
1122 if usesFWChain {
1123 filterRules.Write(
1124 "-A", string(kubeProxyFirewallChain),
1125 "-m", "comment", "--comment", fmt.Sprintf(`"%s traffic not accepted by %s"`, svcPortNameString, svcInfo.firewallChainName),
1126 "-m", protocol, "-p", protocol,
1127 "-d", lbip.String(),
1128 "--dport", strconv.Itoa(svcInfo.Port()),
1129 "-j", "DROP")
1130 }
1131 }
1132 if !hasExternalEndpoints {
1133
1134
1135
1136 for _, lbip := range svcInfo.LoadBalancerVIPs() {
1137 filterRules.Write(
1138 "-A", string(kubeExternalServicesChain),
1139 "-m", "comment", "--comment", externalTrafficFilterComment,
1140 "-m", protocol, "-p", protocol,
1141 "-d", lbip.String(),
1142 "--dport", strconv.Itoa(svcInfo.Port()),
1143 "-j", externalTrafficFilterTarget,
1144 )
1145 }
1146 }
1147
1148
1149 if svcInfo.NodePort() != 0 {
1150 if hasEndpoints {
1151
1152
1153
1154 natRules.Write(
1155 "-A", string(kubeNodePortsChain),
1156 "-m", "comment", "--comment", svcPortNameString,
1157 "-m", protocol, "-p", protocol,
1158 "--dport", strconv.Itoa(svcInfo.NodePort()),
1159 "-j", string(externalTrafficChain))
1160 }
1161 if !hasExternalEndpoints {
1162
1163
1164
1165 filterRules.Write(
1166 "-A", string(kubeExternalServicesChain),
1167 "-m", "comment", "--comment", externalTrafficFilterComment,
1168 "-m", "addrtype", "--dst-type", "LOCAL",
1169 "-m", protocol, "-p", protocol,
1170 "--dport", strconv.Itoa(svcInfo.NodePort()),
1171 "-j", externalTrafficFilterTarget,
1172 )
1173 }
1174 }
1175
1176
1177 if svcInfo.HealthCheckNodePort() != 0 {
1178
1179
1180 filterRules.Write(
1181 "-A", string(kubeNodePortsChain),
1182 "-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcPortNameString),
1183 "-m", "tcp", "-p", "tcp",
1184 "--dport", strconv.Itoa(svcInfo.HealthCheckNodePort()),
1185 "-j", "ACCEPT",
1186 )
1187 }
1188
1189
1190
1191
1192
1193 if tryPartialSync && !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) {
1194 natChains = skippedNatChains
1195 natRules = skippedNatRules
1196 }
1197
1198
1199 if hasInternalEndpoints {
1200 args = append(args[:0],
1201 "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
1202 "-m", protocol, "-p", protocol,
1203 "-d", svcInfo.ClusterIP().String(),
1204 "--dport", strconv.Itoa(svcInfo.Port()),
1205 )
1206 if proxier.masqueradeAll {
1207 natRules.Write(
1208 "-A", string(internalTrafficChain),
1209 args,
1210 "-j", string(kubeMarkMasqChain))
1211 } else if proxier.localDetector.IsImplemented() {
1212
1213
1214
1215
1216
1217 natRules.Write(
1218 "-A", string(internalTrafficChain),
1219 args,
1220 proxier.localDetector.IfNotLocal(),
1221 "-j", string(kubeMarkMasqChain))
1222 }
1223 }
1224
1225
1226
1227
1228
1229 if usesExternalTrafficChain {
1230 natChains.Write(utiliptables.MakeChainLine(externalTrafficChain))
1231 activeNATChains.Insert(externalTrafficChain)
1232
1233 if !svcInfo.ExternalPolicyLocal() {
1234
1235
1236 natRules.Write(
1237 "-A", string(externalTrafficChain),
1238 "-m", "comment", "--comment", fmt.Sprintf(`"masquerade traffic for %s external destinations"`, svcPortNameString),
1239 "-j", string(kubeMarkMasqChain))
1240 } else {
1241
1242
1243
1244 if proxier.localDetector.IsImplemented() {
1245
1246
1247
1248
1249 natRules.Write(
1250 "-A", string(externalTrafficChain),
1251 "-m", "comment", "--comment", fmt.Sprintf(`"pod traffic for %s external destinations"`, svcPortNameString),
1252 proxier.localDetector.IfLocal(),
1253 "-j", string(clusterPolicyChain))
1254 }
1255
1256
1257
1258
1259 natRules.Write(
1260 "-A", string(externalTrafficChain),
1261 "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s external destinations"`, svcPortNameString),
1262 "-m", "addrtype", "--src-type", "LOCAL",
1263 "-j", string(kubeMarkMasqChain))
1264
1265
1266
1267
1268 natRules.Write(
1269 "-A", string(externalTrafficChain),
1270 "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s external destinations"`, svcPortNameString),
1271 "-m", "addrtype", "--src-type", "LOCAL",
1272 "-j", string(clusterPolicyChain))
1273 }
1274
1275
1276 if hasExternalEndpoints {
1277 natRules.Write(
1278 "-A", string(externalTrafficChain),
1279 "-j", string(externalPolicyChain))
1280 }
1281 }
1282
1283
1284 if usesFWChain {
1285 natChains.Write(utiliptables.MakeChainLine(fwChain))
1286 activeNATChains.Insert(fwChain)
1287
1288
1289
1290
1291
1292
1293 args = append(args[:0],
1294 "-A", string(fwChain),
1295 "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
1296 )
1297
1298
1299 allowFromNode := false
1300 for _, cidr := range svcInfo.LoadBalancerSourceRanges() {
1301 natRules.Write(args, "-s", cidr.String(), "-j", string(externalTrafficChain))
1302 if cidr.Contains(proxier.nodeIP) {
1303 allowFromNode = true
1304 }
1305 }
1306
1307
1308
1309
1310
1311 if allowFromNode {
1312 for _, lbip := range svcInfo.LoadBalancerVIPs() {
1313 natRules.Write(
1314 args,
1315 "-s", lbip.String(),
1316 "-j", string(externalTrafficChain))
1317 }
1318 }
1319
1320
1321
1322 natRules.Write(
1323 "-A", string(fwChain),
1324 "-m", "comment", "--comment", fmt.Sprintf(`"other traffic to %s will be dropped by KUBE-PROXY-FIREWALL"`, svcPortNameString),
1325 )
1326 }
1327
1328
1329
1330 if usesClusterPolicyChain {
1331 natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain))
1332 activeNATChains.Insert(clusterPolicyChain)
1333 proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args)
1334 }
1335
1336
1337
1338 if usesLocalPolicyChain {
1339 natChains.Write(utiliptables.MakeChainLine(localPolicyChain))
1340 activeNATChains.Insert(localPolicyChain)
1341 proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, localPolicyChain, localEndpoints, args)
1342 }
1343
1344
1345 for _, ep := range allLocallyReachableEndpoints {
1346 epInfo, ok := ep.(*endpointInfo)
1347 if !ok {
1348 klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
1349 continue
1350 }
1351
1352 endpointChain := epInfo.ChainName
1353
1354
1355 natChains.Write(utiliptables.MakeChainLine(endpointChain))
1356 activeNATChains.Insert(endpointChain)
1357
1358 args = append(args[:0], "-A", string(endpointChain))
1359 args = proxier.appendServiceCommentLocked(args, svcPortNameString)
1360
1361 natRules.Write(
1362 args,
1363 "-s", epInfo.IP(),
1364 "-j", string(kubeMarkMasqChain))
1365
1366 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1367 args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
1368 }
1369
1370 args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.String())
1371 natRules.Write(args)
1372 }
1373 }
1374
1375
1376
1377
1378
1379 deletedChains := 0
1380 if !proxier.largeClusterMode || time.Since(proxier.lastIPTablesCleanup) > proxier.syncPeriod {
1381 proxier.iptablesData.Reset()
1382 if err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData); err == nil {
1383 existingNATChains := utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes())
1384 for chain := range existingNATChains.Difference(activeNATChains) {
1385 chainString := string(chain)
1386 if !isServiceChainName(chainString) {
1387
1388 continue
1389 }
1390
1391
1392
1393 proxier.natChains.Write(utiliptables.MakeChainLine(chain))
1394 proxier.natRules.Write("-X", chainString)
1395 deletedChains++
1396 }
1397 proxier.lastIPTablesCleanup = time.Now()
1398 } else {
1399 klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted")
1400 }
1401 }
1402
1403
1404
1405 if proxier.nodePortAddresses.MatchAll() {
1406 destinations := []string{"-m", "addrtype", "--dst-type", "LOCAL"}
1407
1408
1409 if isIPv6 {
1410 destinations = append(destinations, "!", "-d", "::1/128")
1411 } else if !proxier.localhostNodePorts {
1412 destinations = append(destinations, "!", "-d", "127.0.0.0/8")
1413 }
1414
1415 proxier.natRules.Write(
1416 "-A", string(kubeServicesChain),
1417 "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
1418 destinations,
1419 "-j", string(kubeNodePortsChain))
1420 } else {
1421 nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
1422 if err != nil {
1423 klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
1424 }
1425 for _, ip := range nodeIPs {
1426 if ip.IsLoopback() {
1427 if isIPv6 {
1428 klog.ErrorS(nil, "--nodeport-addresses includes localhost but localhost NodePorts are not supported on IPv6", "address", ip.String())
1429 continue
1430 } else if !proxier.localhostNodePorts {
1431 klog.ErrorS(nil, "--nodeport-addresses includes localhost but --iptables-localhost-nodeports=false was passed", "address", ip.String())
1432 continue
1433 }
1434 }
1435
1436
1437 proxier.natRules.Write(
1438 "-A", string(kubeServicesChain),
1439 "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
1440 "-d", ip.String(),
1441 "-j", string(kubeNodePortsChain))
1442 }
1443 }
1444
1445
1446
1447
1448
1449 if !proxier.conntrackTCPLiberal {
1450 proxier.filterRules.Write(
1451 "-A", string(kubeForwardChain),
1452 "-m", "conntrack",
1453 "--ctstate", "INVALID",
1454 "-j", "DROP",
1455 )
1456 }
1457
1458
1459
1460
1461 proxier.filterRules.Write(
1462 "-A", string(kubeForwardChain),
1463 "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
1464 "-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
1465 "-j", "ACCEPT",
1466 )
1467
1468
1469
1470 proxier.filterRules.Write(
1471 "-A", string(kubeForwardChain),
1472 "-m", "comment", "--comment", `"kubernetes forwarding conntrack rule"`,
1473 "-m", "conntrack",
1474 "--ctstate", "RELATED,ESTABLISHED",
1475 "-j", "ACCEPT",
1476 )
1477
1478 metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines()))
1479 metrics.IptablesRulesLastSync.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines()))
1480 metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() + skippedNatRules.Lines() - deletedChains))
1481 metrics.IptablesRulesLastSync.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() - deletedChains))
1482
1483
1484 proxier.iptablesData.Reset()
1485 proxier.iptablesData.WriteString("*filter\n")
1486 proxier.iptablesData.Write(proxier.filterChains.Bytes())
1487 proxier.iptablesData.Write(proxier.filterRules.Bytes())
1488 proxier.iptablesData.WriteString("COMMIT\n")
1489 proxier.iptablesData.WriteString("*nat\n")
1490 proxier.iptablesData.Write(proxier.natChains.Bytes())
1491 proxier.iptablesData.Write(proxier.natRules.Bytes())
1492 proxier.iptablesData.WriteString("COMMIT\n")
1493
1494 klog.V(2).InfoS("Reloading service iptables data",
1495 "numServices", len(proxier.svcPortMap),
1496 "numEndpoints", totalEndpoints,
1497 "numFilterChains", proxier.filterChains.Lines(),
1498 "numFilterRules", proxier.filterRules.Lines(),
1499 "numNATChains", proxier.natChains.Lines(),
1500 "numNATRules", proxier.natRules.Lines(),
1501 )
1502 klog.V(9).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
1503
1504
1505 err := proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
1506 if err != nil {
1507 if pErr, ok := err.(utiliptables.ParseError); ok {
1508 lines := utiliptables.ExtractLines(proxier.iptablesData.Bytes(), pErr.Line(), 3)
1509 klog.ErrorS(pErr, "Failed to execute iptables-restore", "rules", lines)
1510 } else {
1511 klog.ErrorS(err, "Failed to execute iptables-restore")
1512 }
1513 metrics.IptablesRestoreFailuresTotal.Inc()
1514 return
1515 }
1516 success = true
1517 proxier.needFullSync = false
1518
1519 for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
1520 for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
1521 latency := metrics.SinceInSeconds(lastChangeTriggerTime)
1522 metrics.NetworkProgrammingLatency.Observe(latency)
1523 klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
1524 }
1525 }
1526
1527 metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal))
1528 metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal))
1529 if proxier.healthzServer != nil {
1530 proxier.healthzServer.Updated(proxier.ipFamily)
1531 }
1532 metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
1533
1534
1535
1536
1537 if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
1538 klog.ErrorS(err, "Error syncing healthcheck services")
1539 }
1540 if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
1541 klog.ErrorS(err, "Error syncing healthcheck endpoints")
1542 }
1543
1544
1545 conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
1546 }
1547
1548 func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
1549
1550 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1551 for _, ep := range endpoints {
1552 epInfo, ok := ep.(*endpointInfo)
1553 if !ok {
1554 continue
1555 }
1556 comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String())
1557
1558 args = append(args[:0],
1559 "-A", string(svcChain),
1560 )
1561 args = proxier.appendServiceCommentLocked(args, comment)
1562 args = append(args,
1563 "-m", "recent", "--name", string(epInfo.ChainName),
1564 "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
1565 "-j", string(epInfo.ChainName),
1566 )
1567 natRules.Write(args)
1568 }
1569 }
1570
1571
1572 numEndpoints := len(endpoints)
1573 for i, ep := range endpoints {
1574 epInfo, ok := ep.(*endpointInfo)
1575 if !ok {
1576 continue
1577 }
1578 comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String())
1579
1580 args = append(args[:0], "-A", string(svcChain))
1581 args = proxier.appendServiceCommentLocked(args, comment)
1582 if i < (numEndpoints - 1) {
1583
1584 args = append(args,
1585 "-m", "statistic",
1586 "--mode", "random",
1587 "--probability", proxier.probability(numEndpoints-i))
1588 }
1589
1590 natRules.Write(args, "-j", string(epInfo.ChainName))
1591 }
1592 }
1593
View as plain text