1
2
3
4
19
20 package nftables
21
22
23
24
25
26 import (
27 "context"
28 "crypto/sha256"
29 "encoding/base32"
30 "fmt"
31 "net"
32 "reflect"
33 "strconv"
34 "strings"
35 "sync"
36 "sync/atomic"
37 "time"
38
39 v1 "k8s.io/api/core/v1"
40 discovery "k8s.io/api/discovery/v1"
41 "k8s.io/apimachinery/pkg/types"
42 "k8s.io/apimachinery/pkg/util/sets"
43 "k8s.io/apimachinery/pkg/util/wait"
44 "k8s.io/client-go/tools/events"
45 utilsysctl "k8s.io/component-helpers/node/util/sysctl"
46 "k8s.io/klog/v2"
47 "k8s.io/kubernetes/pkg/proxy"
48 "k8s.io/kubernetes/pkg/proxy/conntrack"
49 "k8s.io/kubernetes/pkg/proxy/healthcheck"
50 "k8s.io/kubernetes/pkg/proxy/metaproxier"
51 "k8s.io/kubernetes/pkg/proxy/metrics"
52 proxyutil "k8s.io/kubernetes/pkg/proxy/util"
53 proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
54 "k8s.io/kubernetes/pkg/util/async"
55 utilexec "k8s.io/utils/exec"
56 netutils "k8s.io/utils/net"
57 "k8s.io/utils/ptr"
58 "sigs.k8s.io/knftables"
59 )
60
61 const (
62
63
64 kubeProxyTable = "kube-proxy"
65
66
67 filterPreroutingChain = "filter-prerouting"
68 filterInputChain = "filter-input"
69 filterForwardChain = "filter-forward"
70 filterOutputChain = "filter-output"
71 filterOutputPostDNATChain = "filter-output-post-dnat"
72 natPreroutingChain = "nat-prerouting"
73 natOutputChain = "nat-output"
74 natPostroutingChain = "nat-postrouting"
75
76
77 servicesChain = "services"
78 serviceIPsMap = "service-ips"
79 serviceNodePortsMap = "service-nodeports"
80
81
82 nodePortIPsSet = "nodeport-ips"
83
84
85 clusterIPsSet = "cluster-ips"
86
87
88 serviceEndpointsCheckChain = "service-endpoints-check"
89 nodePortEndpointsCheckChain = "nodeport-endpoints-check"
90 noEndpointServicesMap = "no-endpoint-services"
91 noEndpointNodePortsMap = "no-endpoint-nodeports"
92 rejectChain = "reject-chain"
93
94
95 clusterIPsCheckChain = "cluster-ips-check"
96
97
98 firewallIPsMap = "firewall-ips"
99 firewallCheckChain = "firewall-check"
100
101
102 markMasqChain = "mark-for-masquerade"
103 masqueradingChain = "masquerading"
104 )
105
106
107 func NewDualStackProxier(
108 sysctl utilsysctl.Interface,
109 syncPeriod time.Duration,
110 minSyncPeriod time.Duration,
111 masqueradeAll bool,
112 masqueradeBit int,
113 localDetectors [2]proxyutiliptables.LocalTrafficDetector,
114 hostname string,
115 nodeIPs map[v1.IPFamily]net.IP,
116 recorder events.EventRecorder,
117 healthzServer *healthcheck.ProxierHealthServer,
118 nodePortAddresses []string,
119 initOnly bool,
120 ) (proxy.Provider, error) {
121
122 ipv4Proxier, err := NewProxier(v1.IPv4Protocol, sysctl,
123 syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[0], hostname,
124 nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
125 if err != nil {
126 return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
127 }
128
129 ipv6Proxier, err := NewProxier(v1.IPv6Protocol, sysctl,
130 syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[1], hostname,
131 nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
132 if err != nil {
133 return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
134 }
135 if initOnly {
136 return nil, nil
137 }
138 return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
139 }
140
141
142 type Proxier struct {
143
144 ipFamily v1.IPFamily
145
146
147
148
149
150 endpointsChanges *proxy.EndpointsChangeTracker
151 serviceChanges *proxy.ServiceChangeTracker
152
153 mu sync.Mutex
154 svcPortMap proxy.ServicePortMap
155 endpointsMap proxy.EndpointsMap
156 nodeLabels map[string]string
157
158
159
160 endpointSlicesSynced bool
161 servicesSynced bool
162 initialized int32
163 syncRunner *async.BoundedFrequencyRunner
164 syncPeriod time.Duration
165 flushed bool
166
167
168 nftables knftables.Interface
169 masqueradeAll bool
170 masqueradeMark string
171 conntrack conntrack.Interface
172 localDetector proxyutiliptables.LocalTrafficDetector
173 hostname string
174 nodeIP net.IP
175 recorder events.EventRecorder
176
177 serviceHealthServer healthcheck.ServiceHealthServer
178 healthzServer *healthcheck.ProxierHealthServer
179
180
181 nodePortAddresses *proxyutil.NodePortAddresses
182
183
184 networkInterfacer proxyutil.NetworkInterfacer
185
186
187 staleChains map[string]time.Time
188
189
190
191 serviceCIDRs string
192 }
193
194
195 var _ proxy.Provider = &Proxier{}
196
197
198
199
200 func NewProxier(ipFamily v1.IPFamily,
201 sysctl utilsysctl.Interface,
202 syncPeriod time.Duration,
203 minSyncPeriod time.Duration,
204 masqueradeAll bool,
205 masqueradeBit int,
206 localDetector proxyutiliptables.LocalTrafficDetector,
207 hostname string,
208 nodeIP net.IP,
209 recorder events.EventRecorder,
210 healthzServer *healthcheck.ProxierHealthServer,
211 nodePortAddressStrings []string,
212 initOnly bool,
213 ) (*Proxier, error) {
214 nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings, nodeIP)
215
216 if initOnly {
217 klog.InfoS("System initialized and --init-only specified")
218 return nil, nil
219 }
220
221
222 masqueradeValue := 1 << uint(masqueradeBit)
223 masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
224 klog.V(2).InfoS("Using nftables mark for masquerade", "ipFamily", ipFamily, "mark", masqueradeMark)
225
226 serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
227
228 var nftablesFamily knftables.Family
229 if ipFamily == v1.IPv4Protocol {
230 nftablesFamily = knftables.IPv4Family
231 } else {
232 nftablesFamily = knftables.IPv6Family
233 }
234 nft, err := knftables.New(nftablesFamily, kubeProxyTable)
235 if err != nil {
236 return nil, err
237 }
238
239 proxier := &Proxier{
240 ipFamily: ipFamily,
241 svcPortMap: make(proxy.ServicePortMap),
242 serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
243 endpointsMap: make(proxy.EndpointsMap),
244 endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
245 syncPeriod: syncPeriod,
246 nftables: nft,
247 masqueradeAll: masqueradeAll,
248 masqueradeMark: masqueradeMark,
249 conntrack: conntrack.NewExec(utilexec.New()),
250 localDetector: localDetector,
251 hostname: hostname,
252 nodeIP: nodeIP,
253 recorder: recorder,
254 serviceHealthServer: serviceHealthServer,
255 healthzServer: healthzServer,
256 nodePortAddresses: nodePortAddresses,
257 networkInterfacer: proxyutil.RealNetwork{},
258 staleChains: make(map[string]time.Time),
259 }
260
261 burstSyncs := 2
262 klog.V(2).InfoS("NFTables sync params", "ipFamily", ipFamily, "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
263 proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
264
265 return proxier, nil
266 }
267
268
269 type servicePortInfo struct {
270 *proxy.BaseServicePortInfo
271
272 nameString string
273 clusterPolicyChainName string
274 localPolicyChainName string
275 externalChainName string
276 firewallChainName string
277 }
278
279
280 func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
281 svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo}
282
283
284 svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
285 svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
286 svcPort.nameString = svcPortName.String()
287
288 chainNameBase := servicePortChainNameBase(&svcPortName, strings.ToLower(string(svcPort.Protocol())))
289 svcPort.clusterPolicyChainName = servicePortPolicyClusterChainNamePrefix + chainNameBase
290 svcPort.localPolicyChainName = servicePortPolicyLocalChainNamePrefix + chainNameBase
291 svcPort.externalChainName = serviceExternalChainNamePrefix + chainNameBase
292 svcPort.firewallChainName = servicePortFirewallChainNamePrefix + chainNameBase
293
294 return svcPort
295 }
296
297
298 type endpointInfo struct {
299 *proxy.BaseEndpointInfo
300
301 chainName string
302 affinitySetName string
303 }
304
305
306 func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
307 chainNameBase := servicePortEndpointChainNameBase(svcPortName, strings.ToLower(string(svcPortName.Protocol)), baseInfo.String())
308 return &endpointInfo{
309 BaseEndpointInfo: baseInfo,
310 chainName: servicePortEndpointChainNamePrefix + chainNameBase,
311 affinitySetName: servicePortEndpointAffinityNamePrefix + chainNameBase,
312 }
313 }
314
315
316
317
318
319
320
321
322
323
324 type nftablesBaseChain struct {
325 name string
326 chainType knftables.BaseChainType
327 hook knftables.BaseChainHook
328 priority knftables.BaseChainPriority
329 }
330
331 var nftablesBaseChains = []nftablesBaseChain{
332
333
334 {filterPreroutingChain, knftables.FilterType, knftables.PreroutingHook, knftables.DNATPriority + "-10"},
335 {filterInputChain, knftables.FilterType, knftables.InputHook, knftables.DNATPriority + "-10"},
336 {filterForwardChain, knftables.FilterType, knftables.ForwardHook, knftables.DNATPriority + "-10"},
337 {filterOutputChain, knftables.FilterType, knftables.OutputHook, knftables.DNATPriority + "-10"},
338 {filterOutputPostDNATChain, knftables.FilterType, knftables.OutputHook, knftables.DNATPriority + "+10"},
339 {natPreroutingChain, knftables.NATType, knftables.PreroutingHook, knftables.DNATPriority},
340 {natOutputChain, knftables.NATType, knftables.OutputHook, knftables.DNATPriority},
341 {natPostroutingChain, knftables.NATType, knftables.PostroutingHook, knftables.SNATPriority},
342 }
343
344
345
346
347 type nftablesJumpChain struct {
348 dstChain string
349 srcChain string
350 extraArgs string
351 }
352
353 var nftablesJumpChains = []nftablesJumpChain{
354
355
356
357 {nodePortEndpointsCheckChain, filterInputChain, "ct state new"},
358 {serviceEndpointsCheckChain, filterInputChain, "ct state new"},
359 {serviceEndpointsCheckChain, filterForwardChain, "ct state new"},
360 {serviceEndpointsCheckChain, filterOutputChain, "ct state new"},
361
362 {firewallCheckChain, filterPreroutingChain, "ct state new"},
363 {firewallCheckChain, filterOutputChain, "ct state new"},
364
365 {servicesChain, natOutputChain, ""},
366 {servicesChain, natPreroutingChain, ""},
367 {masqueradingChain, natPostroutingChain, ""},
368
369 {clusterIPsCheckChain, filterForwardChain, "ct state new"},
370 {clusterIPsCheckChain, filterOutputPostDNATChain, "ct state new"},
371 }
372
373
374
375
376 func ensureChain(chain string, tx *knftables.Transaction, createdChains sets.Set[string]) {
377 if createdChains.Has(chain) {
378 return
379 }
380 tx.Add(&knftables.Chain{
381 Name: chain,
382 })
383 tx.Flush(&knftables.Chain{
384 Name: chain,
385 })
386 createdChains.Insert(chain)
387 }
388
389 func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
390 ipX := "ip"
391 ipvX_addr := "ipv4_addr"
392 noLocalhost := "ip daddr != 127.0.0.0/8"
393 if proxier.ipFamily == v1.IPv6Protocol {
394 ipX = "ip6"
395 ipvX_addr = "ipv6_addr"
396 noLocalhost = "ip6 daddr != ::1"
397 }
398
399 tx.Add(&knftables.Table{
400 Comment: ptr.To("rules for kube-proxy"),
401 })
402
403
404
405
406 if !proxier.flushed {
407 for _, bc := range nftablesBaseChains {
408 chain := &knftables.Chain{
409 Name: bc.name,
410 }
411 tx.Add(chain)
412 tx.Delete(chain)
413 }
414 proxier.flushed = true
415 }
416
417
418 for _, bc := range nftablesBaseChains {
419 chain := &knftables.Chain{
420 Name: bc.name,
421 Type: ptr.To(bc.chainType),
422 Hook: ptr.To(bc.hook),
423 Priority: ptr.To(bc.priority),
424 }
425 tx.Add(chain)
426 tx.Flush(chain)
427 }
428
429
430 createdChains := sets.New[string]()
431 for _, c := range nftablesJumpChains {
432 ensureChain(c.dstChain, tx, createdChains)
433 tx.Add(&knftables.Rule{
434 Chain: c.srcChain,
435 Rule: knftables.Concat(
436 c.extraArgs,
437 "jump", c.dstChain,
438 ),
439 })
440 }
441
442
443 for _, chain := range []string{servicesChain, clusterIPsCheckChain, masqueradingChain, markMasqChain} {
444 ensureChain(chain, tx, createdChains)
445 }
446
447
448 tx.Add(&knftables.Rule{
449 Chain: markMasqChain,
450 Rule: knftables.Concat(
451 "mark", "set", "mark", "or", proxier.masqueradeMark,
452 ),
453 })
454
455 tx.Add(&knftables.Rule{
456 Chain: masqueradingChain,
457 Rule: knftables.Concat(
458 "mark", "and", proxier.masqueradeMark, "==", "0",
459 "return",
460 ),
461 })
462 tx.Add(&knftables.Rule{
463 Chain: masqueradingChain,
464 Rule: knftables.Concat(
465 "mark", "set", "mark", "xor", proxier.masqueradeMark,
466 ),
467 })
468 tx.Add(&knftables.Rule{
469 Chain: masqueradingChain,
470 Rule: "masquerade fully-random",
471 })
472
473
474 tx.Add(&knftables.Set{
475 Name: clusterIPsSet,
476 Type: ipvX_addr,
477 Comment: ptr.To("Active ClusterIPs"),
478 })
479
480
481 tx.Add(&knftables.Rule{
482 Chain: clusterIPsCheckChain,
483 Rule: knftables.Concat(
484 ipX, "daddr", "@", clusterIPsSet, "reject",
485 ),
486 Comment: ptr.To("Reject traffic to invalid ports of ClusterIPs"),
487 })
488
489
490 if len(proxier.serviceCIDRs) > 0 {
491 tx.Add(&knftables.Rule{
492 Chain: clusterIPsCheckChain,
493 Rule: knftables.Concat(
494 ipX, "daddr", "{", proxier.serviceCIDRs, "}",
495 "drop",
496 ),
497 Comment: ptr.To("Drop traffic to unallocated ClusterIPs"),
498 })
499 }
500
501
502
503
504 tx.Add(&knftables.Set{
505 Name: nodePortIPsSet,
506 Type: ipvX_addr,
507 Comment: ptr.To("IPs that accept NodePort traffic"),
508 })
509 if proxier.nodePortAddresses.MatchAll() {
510 tx.Delete(&knftables.Set{
511 Name: nodePortIPsSet,
512 })
513 } else {
514 tx.Flush(&knftables.Set{
515 Name: nodePortIPsSet,
516 })
517 nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
518 if err != nil {
519 klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
520 }
521 for _, ip := range nodeIPs {
522 if ip.IsLoopback() {
523 klog.ErrorS(nil, "--nodeport-addresses includes localhost but localhost NodePorts are not supported", "address", ip.String())
524 continue
525 }
526 tx.Add(&knftables.Element{
527 Set: nodePortIPsSet,
528 Key: []string{
529 ip.String(),
530 },
531 })
532 }
533 }
534
535
536 tx.Add(&knftables.Map{
537 Name: noEndpointServicesMap,
538 Type: ipvX_addr + " . inet_proto . inet_service : verdict",
539 Comment: ptr.To("vmap to drop or reject packets to services with no endpoints"),
540 })
541 tx.Add(&knftables.Map{
542 Name: noEndpointNodePortsMap,
543 Type: "inet_proto . inet_service : verdict",
544 Comment: ptr.To("vmap to drop or reject packets to service nodeports with no endpoints"),
545 })
546
547 tx.Add(&knftables.Chain{
548 Name: rejectChain,
549 Comment: ptr.To("helper for @no-endpoint-services / @no-endpoint-nodeports"),
550 })
551 tx.Flush(&knftables.Chain{
552 Name: rejectChain,
553 })
554 tx.Add(&knftables.Rule{
555 Chain: rejectChain,
556 Rule: "reject",
557 })
558
559 tx.Add(&knftables.Rule{
560 Chain: serviceEndpointsCheckChain,
561 Rule: knftables.Concat(
562 ipX, "daddr", ".", "meta l4proto", ".", "th dport",
563 "vmap", "@", noEndpointServicesMap,
564 ),
565 })
566
567 if proxier.nodePortAddresses.MatchAll() {
568 tx.Add(&knftables.Rule{
569 Chain: nodePortEndpointsCheckChain,
570 Rule: knftables.Concat(
571 noLocalhost,
572 "meta l4proto . th dport",
573 "vmap", "@", noEndpointNodePortsMap,
574 ),
575 })
576 } else {
577 tx.Add(&knftables.Rule{
578 Chain: nodePortEndpointsCheckChain,
579 Rule: knftables.Concat(
580 ipX, "daddr", "@", nodePortIPsSet,
581 "meta l4proto . th dport",
582 "vmap", "@", noEndpointNodePortsMap,
583 ),
584 })
585 }
586
587
588 tx.Add(&knftables.Map{
589 Name: firewallIPsMap,
590 Type: ipvX_addr + " . inet_proto . inet_service : verdict",
591 Comment: ptr.To("destinations that are subject to LoadBalancerSourceRanges"),
592 })
593
594 ensureChain(firewallCheckChain, tx, createdChains)
595 tx.Add(&knftables.Rule{
596 Chain: firewallCheckChain,
597 Rule: knftables.Concat(
598 ipX, "daddr", ".", "meta l4proto", ".", "th dport",
599 "vmap", "@", firewallIPsMap,
600 ),
601 })
602
603
604 tx.Add(&knftables.Map{
605 Name: serviceIPsMap,
606 Type: ipvX_addr + " . inet_proto . inet_service : verdict",
607 Comment: ptr.To("ClusterIP, ExternalIP and LoadBalancer IP traffic"),
608 })
609 tx.Add(&knftables.Map{
610 Name: serviceNodePortsMap,
611 Type: "inet_proto . inet_service : verdict",
612 Comment: ptr.To("NodePort traffic"),
613 })
614 tx.Add(&knftables.Rule{
615 Chain: servicesChain,
616 Rule: knftables.Concat(
617 ipX, "daddr", ".", "meta l4proto", ".", "th dport",
618 "vmap", "@", serviceIPsMap,
619 ),
620 })
621 if proxier.nodePortAddresses.MatchAll() {
622 tx.Add(&knftables.Rule{
623 Chain: servicesChain,
624 Rule: knftables.Concat(
625 "fib daddr type local",
626 noLocalhost,
627 "meta l4proto . th dport",
628 "vmap", "@", serviceNodePortsMap,
629 ),
630 })
631 } else {
632 tx.Add(&knftables.Rule{
633 Chain: servicesChain,
634 Rule: knftables.Concat(
635 ipX, "daddr @nodeport-ips",
636 "meta l4proto . th dport",
637 "vmap", "@", serviceNodePortsMap,
638 ),
639 })
640 }
641 }
642
643
644
645 func CleanupLeftovers() bool {
646 var encounteredError bool
647
648 for _, family := range []knftables.Family{knftables.IPv4Family, knftables.IPv6Family} {
649 nft, err := knftables.New(family, kubeProxyTable)
650 if err == nil {
651 tx := nft.NewTransaction()
652 tx.Delete(&knftables.Table{})
653 err = nft.Run(context.TODO(), tx)
654 }
655 if err != nil && !knftables.IsNotFound(err) {
656 klog.ErrorS(err, "Error cleaning up nftables rules")
657 encounteredError = true
658 }
659 }
660
661 return encounteredError
662 }
663
664
665 func (proxier *Proxier) Sync() {
666 if proxier.healthzServer != nil {
667 proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
668 }
669 metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
670 proxier.syncRunner.Run()
671 }
672
673
674 func (proxier *Proxier) SyncLoop() {
675
676 if proxier.healthzServer != nil {
677 proxier.healthzServer.Updated(proxier.ipFamily)
678 }
679
680
681 metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
682 proxier.syncRunner.Loop(wait.NeverStop)
683 }
684
685 func (proxier *Proxier) setInitialized(value bool) {
686 var initialized int32
687 if value {
688 initialized = 1
689 }
690 atomic.StoreInt32(&proxier.initialized, initialized)
691 }
692
693 func (proxier *Proxier) isInitialized() bool {
694 return atomic.LoadInt32(&proxier.initialized) > 0
695 }
696
697
698
699 func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
700 proxier.OnServiceUpdate(nil, service)
701 }
702
703
704
705 func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
706 if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
707 proxier.Sync()
708 }
709 }
710
711
712
713 func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
714 proxier.OnServiceUpdate(service, nil)
715
716 }
717
718
719
720 func (proxier *Proxier) OnServiceSynced() {
721 proxier.mu.Lock()
722 proxier.servicesSynced = true
723 proxier.setInitialized(proxier.endpointSlicesSynced)
724 proxier.mu.Unlock()
725
726
727 proxier.syncProxyRules()
728 }
729
730
731
732 func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
733 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
734 proxier.Sync()
735 }
736 }
737
738
739
740 func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
741 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
742 proxier.Sync()
743 }
744 }
745
746
747
748 func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
749 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
750 proxier.Sync()
751 }
752 }
753
754
755
756 func (proxier *Proxier) OnEndpointSlicesSynced() {
757 proxier.mu.Lock()
758 proxier.endpointSlicesSynced = true
759 proxier.setInitialized(proxier.servicesSynced)
760 proxier.mu.Unlock()
761
762
763 proxier.syncProxyRules()
764 }
765
766
767
768 func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
769 if node.Name != proxier.hostname {
770 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
771 "eventNode", node.Name, "currentNode", proxier.hostname)
772 return
773 }
774
775 if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
776 return
777 }
778
779 proxier.mu.Lock()
780 proxier.nodeLabels = map[string]string{}
781 for k, v := range node.Labels {
782 proxier.nodeLabels[k] = v
783 }
784 proxier.mu.Unlock()
785 klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
786
787 proxier.Sync()
788 }
789
790
791
792 func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
793 if node.Name != proxier.hostname {
794 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
795 "eventNode", node.Name, "currentNode", proxier.hostname)
796 return
797 }
798
799 if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
800 return
801 }
802
803 proxier.mu.Lock()
804 proxier.nodeLabels = map[string]string{}
805 for k, v := range node.Labels {
806 proxier.nodeLabels[k] = v
807 }
808 proxier.mu.Unlock()
809 klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
810
811 proxier.Sync()
812 }
813
814
815
816 func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
817 if node.Name != proxier.hostname {
818 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
819 "eventNode", node.Name, "currentNode", proxier.hostname)
820 return
821 }
822
823 proxier.mu.Lock()
824 proxier.nodeLabels = nil
825 proxier.mu.Unlock()
826
827 proxier.Sync()
828 }
829
830
831
832 func (proxier *Proxier) OnNodeSynced() {
833 }
834
835
836
837 func (proxier *Proxier) OnServiceCIDRsChanged(cidrs []string) {
838 proxier.mu.Lock()
839 defer proxier.mu.Unlock()
840
841 cidrsForProxier := make([]string, 0)
842 for _, cidr := range cidrs {
843 isIPv4CIDR := netutils.IsIPv4CIDRString(cidr)
844 if proxier.ipFamily == v1.IPv4Protocol && isIPv4CIDR {
845 cidrsForProxier = append(cidrsForProxier, cidr)
846 }
847
848 if proxier.ipFamily == v1.IPv6Protocol && !isIPv4CIDR {
849 cidrsForProxier = append(cidrsForProxier, cidr)
850 }
851 }
852 proxier.serviceCIDRs = strings.Join(cidrsForProxier, ",")
853 }
854
855 const (
856
857
858 chainNamePrefixLengthMax = 16
859
860
861
862 chainNameBaseLengthMax = knftables.NameLengthMax - chainNamePrefixLengthMax
863 )
864
865 const (
866 servicePortPolicyClusterChainNamePrefix = "service-"
867 servicePortPolicyLocalChainNamePrefix = "local-"
868 serviceExternalChainNamePrefix = "external-"
869 servicePortEndpointChainNamePrefix = "endpoint-"
870 servicePortEndpointAffinityNamePrefix = "affinity-"
871 servicePortFirewallChainNamePrefix = "firewall-"
872 )
873
874
875
876
877
878
879 func hashAndTruncate(name string) string {
880 hash := sha256.Sum256([]byte(name))
881 encoded := base32.StdEncoding.EncodeToString(hash[:])
882 name = encoded[:8] + "-" + name
883 if len(name) > chainNameBaseLengthMax {
884 name = name[:chainNameBaseLengthMax-3] + "..."
885 }
886 return name
887 }
888
889
890
891
892 func servicePortChainNameBase(servicePortName *proxy.ServicePortName, protocol string) string {
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907 name := fmt.Sprintf("%s/%s/%s/%s",
908 servicePortName.NamespacedName.Namespace,
909 servicePortName.NamespacedName.Name,
910 protocol,
911 servicePortName.Port,
912 )
913
914
915
916
917
918 return hashAndTruncate(name)
919 }
920
921
922
923
924
925 func servicePortEndpointChainNameBase(servicePortName *proxy.ServicePortName, protocol, endpoint string) string {
926
927
928
929
930
931
932
933 endpointIP, endpointPort, _ := net.SplitHostPort(endpoint)
934 if strings.Contains(endpointIP, ":") {
935 endpointIP = strings.ReplaceAll(endpointIP, ":", ".")
936 }
937
938
939
940 name := fmt.Sprintf("%s/%s/%s/%s__%s/%s",
941 servicePortName.NamespacedName.Namespace,
942 servicePortName.NamespacedName.Name,
943 protocol,
944 servicePortName.Port,
945 endpointIP,
946 endpointPort,
947 )
948
949
950
951
952
953
954 return hashAndTruncate(name)
955 }
956
957 func isServiceChainName(chainString string) bool {
958
959
960
961
962 return strings.Contains(chainString, "/")
963 }
964
965 func isAffinitySetName(set string) bool {
966 return strings.HasPrefix(set, servicePortEndpointAffinityNamePrefix)
967 }
968
969
970
971 func (proxier *Proxier) syncProxyRules() {
972 proxier.mu.Lock()
973 defer proxier.mu.Unlock()
974
975
976 if !proxier.isInitialized() {
977 klog.V(2).InfoS("Not syncing nftables until Services and Endpoints have been received from master")
978 return
979 }
980
981
982
983
984
985
986 start := time.Now()
987 defer func() {
988 metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
989 klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
990 }()
991
992 serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
993 endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
994
995 klog.V(2).InfoS("Syncing nftables rules")
996
997 success := false
998 defer func() {
999 if !success {
1000 klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
1001 proxier.syncRunner.RetryAfter(proxier.syncPeriod)
1002 }
1003 }()
1004
1005
1006
1007 if len(proxier.staleChains) > 0 {
1008 oneSecondAgo := start.Add(-time.Second)
1009 tx := proxier.nftables.NewTransaction()
1010 deleted := 0
1011 for chain, modtime := range proxier.staleChains {
1012 if modtime.Before(oneSecondAgo) {
1013 tx.Delete(&knftables.Chain{
1014 Name: chain,
1015 })
1016 delete(proxier.staleChains, chain)
1017 deleted++
1018 }
1019 }
1020 if deleted > 0 {
1021 klog.InfoS("Deleting stale nftables chains", "numChains", deleted)
1022 err := proxier.nftables.Run(context.TODO(), tx)
1023 if err != nil {
1024
1025
1026
1027 klog.ErrorS(err, "Unable to delete stale chains; will retry later")
1028
1029 }
1030 }
1031 }
1032
1033
1034 tx := proxier.nftables.NewTransaction()
1035 proxier.setupNFTables(tx)
1036
1037
1038 ipX := "ip"
1039 ipvX_addr := "ipv4_addr"
1040 if proxier.ipFamily == v1.IPv6Protocol {
1041 ipX = "ip6"
1042 ipvX_addr = "ipv6_addr"
1043 }
1044
1045
1046 tx.Flush(&knftables.Set{
1047 Name: clusterIPsSet,
1048 })
1049 tx.Flush(&knftables.Map{
1050 Name: firewallIPsMap,
1051 })
1052 tx.Flush(&knftables.Map{
1053 Name: noEndpointServicesMap,
1054 })
1055 tx.Flush(&knftables.Map{
1056 Name: noEndpointNodePortsMap,
1057 })
1058 tx.Flush(&knftables.Map{
1059 Name: serviceIPsMap,
1060 })
1061 tx.Flush(&knftables.Map{
1062 Name: serviceNodePortsMap,
1063 })
1064
1065
1066 activeChains := sets.New[string]()
1067 activeAffinitySets := sets.New[string]()
1068
1069
1070
1071 totalEndpoints := 0
1072 for svcName := range proxier.svcPortMap {
1073 totalEndpoints += len(proxier.endpointsMap[svcName])
1074 }
1075
1076
1077
1078 serviceNoLocalEndpointsTotalInternal := 0
1079 serviceNoLocalEndpointsTotalExternal := 0
1080
1081
1082 for svcName, svc := range proxier.svcPortMap {
1083 svcInfo, ok := svc.(*servicePortInfo)
1084 if !ok {
1085 klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
1086 continue
1087 }
1088 protocol := strings.ToLower(string(svcInfo.Protocol()))
1089 svcPortNameString := svcInfo.nameString
1090
1091
1092
1093
1094
1095 allEndpoints := proxier.endpointsMap[svcName]
1096 clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
1097
1098
1099 for _, ep := range allLocallyReachableEndpoints {
1100 if epInfo, ok := ep.(*endpointInfo); ok {
1101 ensureChain(epInfo.chainName, tx, activeChains)
1102 }
1103 }
1104
1105
1106 clusterPolicyChain := svcInfo.clusterPolicyChainName
1107 usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
1108 if usesClusterPolicyChain {
1109 ensureChain(clusterPolicyChain, tx, activeChains)
1110 }
1111
1112
1113 localPolicyChain := svcInfo.localPolicyChainName
1114 usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()
1115 if usesLocalPolicyChain {
1116 ensureChain(localPolicyChain, tx, activeChains)
1117 }
1118
1119
1120
1121
1122
1123
1124
1125 internalPolicyChain := clusterPolicyChain
1126 hasInternalEndpoints := hasEndpoints
1127 if svcInfo.InternalPolicyLocal() {
1128 internalPolicyChain = localPolicyChain
1129 if len(localEndpoints) == 0 {
1130 hasInternalEndpoints = false
1131 }
1132 }
1133 internalTrafficChain := internalPolicyChain
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143 externalPolicyChain := clusterPolicyChain
1144 hasExternalEndpoints := hasEndpoints
1145 if svcInfo.ExternalPolicyLocal() {
1146 externalPolicyChain = localPolicyChain
1147 if len(localEndpoints) == 0 {
1148 hasExternalEndpoints = false
1149 }
1150 }
1151 externalTrafficChain := svcInfo.externalChainName
1152
1153
1154
1155
1156 usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()
1157 if usesExternalTrafficChain {
1158 ensureChain(externalTrafficChain, tx, activeChains)
1159 }
1160
1161 var internalTrafficFilterVerdict, externalTrafficFilterVerdict string
1162 if !hasEndpoints {
1163
1164
1165
1166
1167
1168 internalTrafficFilterVerdict = fmt.Sprintf("goto %s", rejectChain)
1169 externalTrafficFilterVerdict = fmt.Sprintf("goto %s", rejectChain)
1170 } else {
1171 if !hasInternalEndpoints {
1172
1173
1174
1175 internalTrafficFilterVerdict = "drop"
1176 serviceNoLocalEndpointsTotalInternal++
1177 }
1178 if !hasExternalEndpoints {
1179
1180
1181
1182
1183 externalTrafficFilterVerdict = "drop"
1184 serviceNoLocalEndpointsTotalExternal++
1185 }
1186 }
1187
1188
1189 tx.Add(&knftables.Element{
1190 Set: clusterIPsSet,
1191 Key: []string{svcInfo.ClusterIP().String()},
1192 })
1193 if hasInternalEndpoints {
1194 tx.Add(&knftables.Element{
1195 Map: serviceIPsMap,
1196 Key: []string{
1197 svcInfo.ClusterIP().String(),
1198 protocol,
1199 strconv.Itoa(svcInfo.Port()),
1200 },
1201 Value: []string{
1202 fmt.Sprintf("goto %s", internalTrafficChain),
1203 },
1204 })
1205 } else {
1206
1207 tx.Add(&knftables.Element{
1208 Map: noEndpointServicesMap,
1209 Key: []string{
1210 svcInfo.ClusterIP().String(),
1211 protocol,
1212 strconv.Itoa(svcInfo.Port()),
1213 },
1214 Value: []string{
1215 internalTrafficFilterVerdict,
1216 },
1217 Comment: &svcPortNameString,
1218 })
1219 }
1220
1221
1222 for _, externalIP := range svcInfo.ExternalIPs() {
1223 if hasEndpoints {
1224
1225
1226 tx.Add(&knftables.Element{
1227 Map: serviceIPsMap,
1228 Key: []string{
1229 externalIP.String(),
1230 protocol,
1231 strconv.Itoa(svcInfo.Port()),
1232 },
1233 Value: []string{
1234 fmt.Sprintf("goto %s", externalTrafficChain),
1235 },
1236 })
1237 }
1238 if !hasExternalEndpoints {
1239
1240
1241
1242 tx.Add(&knftables.Element{
1243 Map: noEndpointServicesMap,
1244 Key: []string{
1245 externalIP.String(),
1246 protocol,
1247 strconv.Itoa(svcInfo.Port()),
1248 },
1249 Value: []string{
1250 externalTrafficFilterVerdict,
1251 },
1252 Comment: &svcPortNameString,
1253 })
1254 }
1255 }
1256
1257 usesFWChain := len(svcInfo.LoadBalancerVIPs()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
1258 fwChain := svcInfo.firewallChainName
1259 if usesFWChain {
1260 ensureChain(fwChain, tx, activeChains)
1261 var sources []string
1262 allowFromNode := false
1263 for _, cidr := range svcInfo.LoadBalancerSourceRanges() {
1264 if len(sources) > 0 {
1265 sources = append(sources, ",")
1266 }
1267 sources = append(sources, cidr.String())
1268 if cidr.Contains(proxier.nodeIP) {
1269 allowFromNode = true
1270 }
1271 }
1272
1273
1274
1275
1276
1277 if allowFromNode {
1278 for _, lbip := range svcInfo.LoadBalancerVIPs() {
1279 sources = append(sources, ",", lbip.String())
1280 }
1281 }
1282 tx.Add(&knftables.Rule{
1283 Chain: fwChain,
1284 Rule: knftables.Concat(
1285 ipX, "saddr", "!=", "{", sources, "}",
1286 "drop",
1287 ),
1288 })
1289 }
1290
1291
1292 for _, lbip := range svcInfo.LoadBalancerVIPs() {
1293 if hasEndpoints {
1294 tx.Add(&knftables.Element{
1295 Map: serviceIPsMap,
1296 Key: []string{
1297 lbip.String(),
1298 protocol,
1299 strconv.Itoa(svcInfo.Port()),
1300 },
1301 Value: []string{
1302 fmt.Sprintf("goto %s", externalTrafficChain),
1303 },
1304 })
1305 }
1306
1307 if usesFWChain {
1308 tx.Add(&knftables.Element{
1309 Map: firewallIPsMap,
1310 Key: []string{
1311 lbip.String(),
1312 protocol,
1313 strconv.Itoa(svcInfo.Port()),
1314 },
1315 Value: []string{
1316 fmt.Sprintf("goto %s", fwChain),
1317 },
1318 Comment: &svcPortNameString,
1319 })
1320 }
1321 }
1322 if !hasExternalEndpoints {
1323
1324
1325
1326 for _, lbip := range svcInfo.LoadBalancerVIPs() {
1327 tx.Add(&knftables.Element{
1328 Map: noEndpointServicesMap,
1329 Key: []string{
1330 lbip.String(),
1331 protocol,
1332 strconv.Itoa(svcInfo.Port()),
1333 },
1334 Value: []string{
1335 externalTrafficFilterVerdict,
1336 },
1337 Comment: &svcPortNameString,
1338 })
1339 }
1340 }
1341
1342
1343 if svcInfo.NodePort() != 0 {
1344 if hasEndpoints {
1345
1346
1347
1348 tx.Add(&knftables.Element{
1349 Map: serviceNodePortsMap,
1350 Key: []string{
1351 protocol,
1352 strconv.Itoa(svcInfo.NodePort()),
1353 },
1354 Value: []string{
1355 fmt.Sprintf("goto %s", externalTrafficChain),
1356 },
1357 })
1358 }
1359 if !hasExternalEndpoints {
1360
1361
1362
1363 tx.Add(&knftables.Element{
1364 Map: noEndpointNodePortsMap,
1365 Key: []string{
1366 protocol,
1367 strconv.Itoa(svcInfo.NodePort()),
1368 },
1369 Value: []string{
1370 externalTrafficFilterVerdict,
1371 },
1372 Comment: &svcPortNameString,
1373 })
1374 }
1375 }
1376
1377
1378 if hasInternalEndpoints {
1379 if proxier.masqueradeAll {
1380 tx.Add(&knftables.Rule{
1381 Chain: internalTrafficChain,
1382 Rule: knftables.Concat(
1383 ipX, "daddr", svcInfo.ClusterIP(),
1384 protocol, "dport", svcInfo.Port(),
1385 "jump", markMasqChain,
1386 ),
1387 })
1388 } else if proxier.localDetector.IsImplemented() {
1389
1390
1391
1392
1393
1394 tx.Add(&knftables.Rule{
1395 Chain: internalTrafficChain,
1396 Rule: knftables.Concat(
1397 ipX, "daddr", svcInfo.ClusterIP(),
1398 protocol, "dport", svcInfo.Port(),
1399 proxier.localDetector.IfNotLocalNFT(),
1400 "jump", markMasqChain,
1401 ),
1402 })
1403 }
1404 }
1405
1406
1407
1408
1409
1410 if usesExternalTrafficChain {
1411 if !svcInfo.ExternalPolicyLocal() {
1412
1413
1414 tx.Add(&knftables.Rule{
1415 Chain: externalTrafficChain,
1416 Rule: knftables.Concat(
1417 "jump", markMasqChain,
1418 ),
1419 })
1420 } else {
1421
1422
1423
1424 if proxier.localDetector.IsImplemented() {
1425
1426
1427
1428
1429 tx.Add(&knftables.Rule{
1430 Chain: externalTrafficChain,
1431 Rule: knftables.Concat(
1432 proxier.localDetector.IfLocalNFT(),
1433 "goto", clusterPolicyChain,
1434 ),
1435 Comment: ptr.To("short-circuit pod traffic"),
1436 })
1437 }
1438
1439
1440
1441
1442 tx.Add(&knftables.Rule{
1443 Chain: externalTrafficChain,
1444 Rule: knftables.Concat(
1445 "fib", "saddr", "type", "local",
1446 "jump", markMasqChain,
1447 ),
1448 Comment: ptr.To("masquerade local traffic"),
1449 })
1450
1451
1452
1453
1454 tx.Add(&knftables.Rule{
1455 Chain: externalTrafficChain,
1456 Rule: knftables.Concat(
1457 "fib", "saddr", "type", "local",
1458 "goto", clusterPolicyChain,
1459 ),
1460 Comment: ptr.To("short-circuit local traffic"),
1461 })
1462 }
1463
1464
1465 if hasExternalEndpoints {
1466 tx.Add(&knftables.Rule{
1467 Chain: externalTrafficChain,
1468 Rule: knftables.Concat(
1469 "goto", externalPolicyChain,
1470 ),
1471 })
1472 }
1473 }
1474
1475 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1476
1477 for _, ep := range allLocallyReachableEndpoints {
1478 epInfo, ok := ep.(*endpointInfo)
1479 if !ok {
1480 klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep)
1481 continue
1482 }
1483
1484
1485
1486
1487
1488
1489
1490
1491 tx.Add(&knftables.Set{
1492 Name: epInfo.affinitySetName,
1493 Type: ipvX_addr,
1494 Flags: []knftables.SetFlag{
1495
1496
1497
1498
1499
1500
1501
1502
1503 knftables.DynamicFlag,
1504 knftables.TimeoutFlag,
1505 },
1506 Timeout: ptr.To(time.Duration(svcInfo.StickyMaxAgeSeconds()) * time.Second),
1507 })
1508 activeAffinitySets.Insert(epInfo.affinitySetName)
1509 }
1510 }
1511
1512
1513
1514 if usesClusterPolicyChain {
1515 proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints)
1516 }
1517
1518
1519
1520 if usesLocalPolicyChain {
1521 proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, localPolicyChain, localEndpoints)
1522 }
1523
1524
1525 for _, ep := range allLocallyReachableEndpoints {
1526 epInfo, ok := ep.(*endpointInfo)
1527 if !ok {
1528 klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
1529 continue
1530 }
1531
1532 endpointChain := epInfo.chainName
1533
1534
1535 tx.Add(&knftables.Rule{
1536 Chain: endpointChain,
1537 Rule: knftables.Concat(
1538 ipX, "saddr", epInfo.IP(),
1539 "jump", markMasqChain,
1540 ),
1541 })
1542
1543
1544 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1545 tx.Add(&knftables.Rule{
1546 Chain: endpointChain,
1547 Rule: knftables.Concat(
1548 "update", "@", epInfo.affinitySetName,
1549 "{", ipX, "saddr", "}",
1550 ),
1551 })
1552 }
1553
1554
1555 tx.Add(&knftables.Rule{
1556 Chain: endpointChain,
1557 Rule: knftables.Concat(
1558 "meta l4proto", protocol,
1559 "dnat to", epInfo.String(),
1560 ),
1561 })
1562 }
1563 }
1564
1565
1566
1567
1568
1569
1570
1571 existingChains, err := proxier.nftables.List(context.TODO(), "chains")
1572 if err == nil {
1573 for _, chain := range existingChains {
1574 if isServiceChainName(chain) {
1575 if !activeChains.Has(chain) {
1576 tx.Flush(&knftables.Chain{
1577 Name: chain,
1578 })
1579 proxier.staleChains[chain] = start
1580 } else {
1581 delete(proxier.staleChains, chain)
1582 }
1583 }
1584 }
1585 } else if !knftables.IsNotFound(err) {
1586 klog.ErrorS(err, "Failed to list nftables chains: stale chains will not be deleted")
1587 }
1588
1589
1590 existingSets, err := proxier.nftables.List(context.TODO(), "sets")
1591 if err == nil {
1592 for _, set := range existingSets {
1593 if isAffinitySetName(set) && !activeAffinitySets.Has(set) {
1594 tx.Delete(&knftables.Set{
1595 Name: set,
1596 })
1597 }
1598 }
1599 } else if !knftables.IsNotFound(err) {
1600 klog.ErrorS(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
1601 }
1602
1603
1604 klog.V(2).InfoS("Reloading service nftables data",
1605 "numServices", len(proxier.svcPortMap),
1606 "numEndpoints", totalEndpoints,
1607 )
1608
1609
1610
1611
1612 err = proxier.nftables.Run(context.TODO(), tx)
1613 if err != nil {
1614 klog.ErrorS(err, "nftables sync failed")
1615 metrics.IptablesRestoreFailuresTotal.Inc()
1616 return
1617 }
1618 success = true
1619
1620 for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
1621 for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
1622 latency := metrics.SinceInSeconds(lastChangeTriggerTime)
1623 metrics.NetworkProgrammingLatency.Observe(latency)
1624 klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
1625 }
1626 }
1627
1628 metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal))
1629 metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal))
1630 if proxier.healthzServer != nil {
1631 proxier.healthzServer.Updated(proxier.ipFamily)
1632 }
1633 metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
1634
1635
1636
1637
1638 if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
1639 klog.ErrorS(err, "Error syncing healthcheck services")
1640 }
1641 if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
1642 klog.ErrorS(err, "Error syncing healthcheck endpoints")
1643 }
1644
1645
1646 conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
1647 }
1648
1649 func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcPortNameString string, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {
1650
1651 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1652 ipX := "ip"
1653 if proxier.ipFamily == v1.IPv6Protocol {
1654 ipX = "ip6"
1655 }
1656
1657 for _, ep := range endpoints {
1658 epInfo, ok := ep.(*endpointInfo)
1659 if !ok {
1660 continue
1661 }
1662
1663 tx.Add(&knftables.Rule{
1664 Chain: svcChain,
1665 Rule: knftables.Concat(
1666 ipX, "saddr", "@", epInfo.affinitySetName,
1667 "goto", epInfo.chainName,
1668 ),
1669 })
1670 }
1671 }
1672
1673
1674 var elements []string
1675 for i, ep := range endpoints {
1676 epInfo, ok := ep.(*endpointInfo)
1677 if !ok {
1678 continue
1679 }
1680
1681 elements = append(elements,
1682 strconv.Itoa(i), ":", "goto", epInfo.chainName,
1683 )
1684 if i != len(endpoints)-1 {
1685 elements = append(elements, ",")
1686 }
1687 }
1688 tx.Add(&knftables.Rule{
1689 Chain: svcChain,
1690 Rule: knftables.Concat(
1691 "numgen random mod", len(endpoints), "vmap",
1692 "{", elements, "}",
1693 ),
1694 })
1695 }
1696
View as plain text