1
2
3
4
19
20 package ipvs
21
22 import (
23 "bytes"
24 "errors"
25 "fmt"
26 "io"
27 "net"
28 "reflect"
29 "strconv"
30 "strings"
31 "sync"
32 "sync/atomic"
33 "time"
34
35 "k8s.io/klog/v2"
36 utilexec "k8s.io/utils/exec"
37 netutils "k8s.io/utils/net"
38
39 v1 "k8s.io/api/core/v1"
40 discovery "k8s.io/api/discovery/v1"
41 "k8s.io/apimachinery/pkg/types"
42 "k8s.io/apimachinery/pkg/util/sets"
43 "k8s.io/apimachinery/pkg/util/version"
44 "k8s.io/apimachinery/pkg/util/wait"
45 "k8s.io/client-go/tools/events"
46 utilsysctl "k8s.io/component-helpers/node/util/sysctl"
47 "k8s.io/kubernetes/pkg/proxy"
48 "k8s.io/kubernetes/pkg/proxy/conntrack"
49 "k8s.io/kubernetes/pkg/proxy/healthcheck"
50 utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset"
51 utilipvs "k8s.io/kubernetes/pkg/proxy/ipvs/util"
52 "k8s.io/kubernetes/pkg/proxy/metaproxier"
53 "k8s.io/kubernetes/pkg/proxy/metrics"
54 proxyutil "k8s.io/kubernetes/pkg/proxy/util"
55 proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
56 "k8s.io/kubernetes/pkg/util/async"
57 utiliptables "k8s.io/kubernetes/pkg/util/iptables"
58 utilkernel "k8s.io/kubernetes/pkg/util/kernel"
59 )
60
61 const (
62
63 kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
64
65
66 kubeProxyFirewallChain utiliptables.Chain = "KUBE-PROXY-FIREWALL"
67
68
69 kubeSourceRangesFirewallChain utiliptables.Chain = "KUBE-SOURCE-RANGES-FIREWALL"
70
71
72 kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
73
74
75 kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
76
77
78 kubeNodePortChain utiliptables.Chain = "KUBE-NODE-PORT"
79
80
81 kubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
82
83
84 kubeLoadBalancerChain utiliptables.Chain = "KUBE-LOAD-BALANCER"
85
86
87
88 kubeIPVSFilterChain utiliptables.Chain = "KUBE-IPVS-FILTER"
89
90
91
92 kubeIPVSOutFilterChain utiliptables.Chain = "KUBE-IPVS-OUT-FILTER"
93
94
95 defaultScheduler = "rr"
96
97
98 defaultDummyDevice = "kube-ipvs0"
99 )
100
101
102 const (
103 sysctlVSConnTrack = "net/ipv4/vs/conntrack"
104 sysctlConnReuse = "net/ipv4/vs/conn_reuse_mode"
105 sysctlExpireNoDestConn = "net/ipv4/vs/expire_nodest_conn"
106 sysctlExpireQuiescentTemplate = "net/ipv4/vs/expire_quiescent_template"
107 sysctlForward = "net/ipv4/ip_forward"
108 sysctlArpIgnore = "net/ipv4/conf/all/arp_ignore"
109 sysctlArpAnnounce = "net/ipv4/conf/all/arp_announce"
110 )
111
112
113 func NewDualStackProxier(
114 ipt [2]utiliptables.Interface,
115 ipvs utilipvs.Interface,
116 ipset utilipset.Interface,
117 sysctl utilsysctl.Interface,
118 exec utilexec.Interface,
119 syncPeriod time.Duration,
120 minSyncPeriod time.Duration,
121 excludeCIDRs []string,
122 strictARP bool,
123 tcpTimeout time.Duration,
124 tcpFinTimeout time.Duration,
125 udpTimeout time.Duration,
126 masqueradeAll bool,
127 masqueradeBit int,
128 localDetectors [2]proxyutiliptables.LocalTrafficDetector,
129 hostname string,
130 nodeIPs map[v1.IPFamily]net.IP,
131 recorder events.EventRecorder,
132 healthzServer *healthcheck.ProxierHealthServer,
133 scheduler string,
134 nodePortAddresses []string,
135 initOnly bool,
136 ) (proxy.Provider, error) {
137
138 ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], ipvs, ipset, sysctl,
139 exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP,
140 tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
141 localDetectors[0], hostname, nodeIPs[v1.IPv4Protocol], recorder,
142 healthzServer, scheduler, nodePortAddresses, initOnly)
143 if err != nil {
144 return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
145 }
146
147 ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], ipvs, ipset, sysctl,
148 exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP,
149 tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
150 localDetectors[1], hostname, nodeIPs[v1.IPv6Protocol], recorder,
151 healthzServer, scheduler, nodePortAddresses, initOnly)
152 if err != nil {
153 return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
154 }
155 if initOnly {
156 return nil, nil
157 }
158
159
160
161 return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
162 }
163
164
165
166 type Proxier struct {
167
168 ipFamily v1.IPFamily
169
170
171
172
173 endpointsChanges *proxy.EndpointsChangeTracker
174 serviceChanges *proxy.ServiceChangeTracker
175
176 mu sync.Mutex
177 svcPortMap proxy.ServicePortMap
178 endpointsMap proxy.EndpointsMap
179 nodeLabels map[string]string
180
181
182
183
184
185
186 initialSync bool
187
188
189
190 endpointSlicesSynced bool
191 servicesSynced bool
192 initialized int32
193 syncRunner *async.BoundedFrequencyRunner
194
195
196 syncPeriod time.Duration
197 minSyncPeriod time.Duration
198
199 excludeCIDRs []*net.IPNet
200
201 strictARP bool
202 iptables utiliptables.Interface
203 ipvs utilipvs.Interface
204 ipset utilipset.Interface
205 conntrack conntrack.Interface
206 masqueradeAll bool
207 masqueradeMark string
208 localDetector proxyutiliptables.LocalTrafficDetector
209 hostname string
210 nodeIP net.IP
211 recorder events.EventRecorder
212
213 serviceHealthServer healthcheck.ServiceHealthServer
214 healthzServer *healthcheck.ProxierHealthServer
215
216 ipvsScheduler string
217
218
219 iptablesData *bytes.Buffer
220 filterChainsData *bytes.Buffer
221 natChains proxyutil.LineBuffer
222 filterChains proxyutil.LineBuffer
223 natRules proxyutil.LineBuffer
224 filterRules proxyutil.LineBuffer
225
226 netlinkHandle NetLinkHandle
227
228 ipsetList map[string]*IPSet
229
230 nodePortAddresses *proxyutil.NodePortAddresses
231
232
233 networkInterfacer proxyutil.NetworkInterfacer
234 gracefuldeleteManager *GracefulTerminationManager
235
236
237
238
239
240
241 serviceNoLocalEndpointsInternal sets.Set[string]
242
243
244
245
246
247
248 serviceNoLocalEndpointsExternal sets.Set[string]
249
250
251
252
253 lbNoNodeAccessIPPortProtocolEntries []*utilipset.Entry
254 }
255
256
257 var _ proxy.Provider = &Proxier{}
258
259
260
261
262
263
264 func NewProxier(ipFamily v1.IPFamily,
265 ipt utiliptables.Interface,
266 ipvs utilipvs.Interface,
267 ipset utilipset.Interface,
268 sysctl utilsysctl.Interface,
269 exec utilexec.Interface,
270 syncPeriod time.Duration,
271 minSyncPeriod time.Duration,
272 excludeCIDRs []string,
273 strictARP bool,
274 tcpTimeout time.Duration,
275 tcpFinTimeout time.Duration,
276 udpTimeout time.Duration,
277 masqueradeAll bool,
278 masqueradeBit int,
279 localDetector proxyutiliptables.LocalTrafficDetector,
280 hostname string,
281 nodeIP net.IP,
282 recorder events.EventRecorder,
283 healthzServer *healthcheck.ProxierHealthServer,
284 scheduler string,
285 nodePortAddressStrings []string,
286 initOnly bool,
287 ) (*Proxier, error) {
288
289 if err := proxyutil.EnsureSysctl(sysctl, sysctlVSConnTrack, 1); err != nil {
290 return nil, err
291 }
292
293 kernelVersion, err := utilkernel.GetVersion()
294 if err != nil {
295 return nil, fmt.Errorf("failed to get kernel version: %w", err)
296 }
297
298 if kernelVersion.LessThan(version.MustParseGeneric(utilkernel.IPVSConnReuseModeMinSupportedKernelVersion)) {
299 klog.ErrorS(nil, "Can't set sysctl, kernel version doesn't satisfy minimum version requirements", "sysctl", sysctlConnReuse, "minimumKernelVersion", utilkernel.IPVSConnReuseModeMinSupportedKernelVersion)
300 } else if kernelVersion.AtLeast(version.MustParseGeneric(utilkernel.IPVSConnReuseModeFixedKernelVersion)) {
301
302 klog.V(2).InfoS("Left as-is", "sysctl", sysctlConnReuse)
303 } else {
304
305 if err := proxyutil.EnsureSysctl(sysctl, sysctlConnReuse, 0); err != nil {
306 return nil, err
307 }
308 }
309
310
311 if err := proxyutil.EnsureSysctl(sysctl, sysctlExpireNoDestConn, 1); err != nil {
312 return nil, err
313 }
314
315
316 if err := proxyutil.EnsureSysctl(sysctl, sysctlExpireQuiescentTemplate, 1); err != nil {
317 return nil, err
318 }
319
320
321 if err := proxyutil.EnsureSysctl(sysctl, sysctlForward, 1); err != nil {
322 return nil, err
323 }
324
325 if strictARP {
326
327 if err := proxyutil.EnsureSysctl(sysctl, sysctlArpIgnore, 1); err != nil {
328 return nil, err
329 }
330
331
332 if err := proxyutil.EnsureSysctl(sysctl, sysctlArpAnnounce, 2); err != nil {
333 return nil, err
334 }
335 }
336
337
338
339
340 if tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 {
341 if err := ipvs.ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout); err != nil {
342 klog.ErrorS(err, "Failed to configure IPVS timeouts")
343 }
344 }
345
346 if initOnly {
347 klog.InfoS("System initialized and --init-only specified")
348 return nil, nil
349 }
350
351
352 masqueradeValue := 1 << uint(masqueradeBit)
353 masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
354
355 klog.V(2).InfoS("Record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily)
356
357 if len(scheduler) == 0 {
358 klog.InfoS("IPVS scheduler not specified, use rr by default")
359 scheduler = defaultScheduler
360 }
361
362 nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings, nil)
363
364 serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
365
366
367 parsedExcludeCIDRs, _ := netutils.ParseCIDRs(excludeCIDRs)
368
369 proxier := &Proxier{
370 ipFamily: ipFamily,
371 svcPortMap: make(proxy.ServicePortMap),
372 serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
373 endpointsMap: make(proxy.EndpointsMap),
374 endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, nil, ipFamily, recorder, nil),
375 initialSync: true,
376 syncPeriod: syncPeriod,
377 minSyncPeriod: minSyncPeriod,
378 excludeCIDRs: parsedExcludeCIDRs,
379 iptables: ipt,
380 masqueradeAll: masqueradeAll,
381 masqueradeMark: masqueradeMark,
382 conntrack: conntrack.NewExec(exec),
383 localDetector: localDetector,
384 hostname: hostname,
385 nodeIP: nodeIP,
386 recorder: recorder,
387 serviceHealthServer: serviceHealthServer,
388 healthzServer: healthzServer,
389 ipvs: ipvs,
390 ipvsScheduler: scheduler,
391 iptablesData: bytes.NewBuffer(nil),
392 filterChainsData: bytes.NewBuffer(nil),
393 natChains: proxyutil.NewLineBuffer(),
394 natRules: proxyutil.NewLineBuffer(),
395 filterChains: proxyutil.NewLineBuffer(),
396 filterRules: proxyutil.NewLineBuffer(),
397 netlinkHandle: NewNetLinkHandle(ipFamily == v1.IPv6Protocol),
398 ipset: ipset,
399 nodePortAddresses: nodePortAddresses,
400 networkInterfacer: proxyutil.RealNetwork{},
401 gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
402 }
403
404 proxier.ipsetList = make(map[string]*IPSet)
405 for _, is := range ipsetInfo {
406 proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, (ipFamily == v1.IPv6Protocol), is.comment)
407 }
408 burstSyncs := 2
409 klog.V(2).InfoS("ipvs sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
410 proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
411 proxier.gracefuldeleteManager.Run()
412 return proxier, nil
413 }
414
415 func filterCIDRs(wantIPv6 bool, cidrs []string) []string {
416 var filteredCIDRs []string
417 for _, cidr := range cidrs {
418 if netutils.IsIPv6CIDRString(cidr) == wantIPv6 {
419 filteredCIDRs = append(filteredCIDRs, cidr)
420 }
421 }
422 return filteredCIDRs
423 }
424
425
426
427
428 var iptablesJumpChain = []struct {
429 table utiliptables.Table
430 from utiliptables.Chain
431 to utiliptables.Chain
432 comment string
433 }{
434 {utiliptables.TableNAT, utiliptables.ChainOutput, kubeServicesChain, "kubernetes service portals"},
435 {utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"},
436 {utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"},
437 {utiliptables.TableFilter, utiliptables.ChainForward, kubeForwardChain, "kubernetes forwarding rules"},
438 {utiliptables.TableFilter, utiliptables.ChainInput, kubeNodePortChain, "kubernetes health check rules"},
439 {utiliptables.TableFilter, utiliptables.ChainInput, kubeProxyFirewallChain, "kube-proxy firewall rules"},
440 {utiliptables.TableFilter, utiliptables.ChainForward, kubeProxyFirewallChain, "kube-proxy firewall rules"},
441 {utiliptables.TableFilter, utiliptables.ChainInput, kubeIPVSFilterChain, "kubernetes ipvs access filter"},
442 {utiliptables.TableFilter, utiliptables.ChainOutput, kubeIPVSOutFilterChain, "kubernetes ipvs access filter"},
443 }
444
445 var iptablesChains = []struct {
446 table utiliptables.Table
447 chain utiliptables.Chain
448 }{
449 {utiliptables.TableNAT, kubeServicesChain},
450 {utiliptables.TableNAT, kubePostroutingChain},
451 {utiliptables.TableNAT, kubeNodePortChain},
452 {utiliptables.TableNAT, kubeLoadBalancerChain},
453 {utiliptables.TableNAT, kubeMarkMasqChain},
454 {utiliptables.TableFilter, kubeForwardChain},
455 {utiliptables.TableFilter, kubeNodePortChain},
456 {utiliptables.TableFilter, kubeProxyFirewallChain},
457 {utiliptables.TableFilter, kubeSourceRangesFirewallChain},
458 {utiliptables.TableFilter, kubeIPVSFilterChain},
459 {utiliptables.TableFilter, kubeIPVSOutFilterChain},
460 }
461
462 var iptablesCleanupChains = []struct {
463 table utiliptables.Table
464 chain utiliptables.Chain
465 }{
466 {utiliptables.TableNAT, kubeServicesChain},
467 {utiliptables.TableNAT, kubePostroutingChain},
468 {utiliptables.TableNAT, kubeNodePortChain},
469 {utiliptables.TableNAT, kubeLoadBalancerChain},
470 {utiliptables.TableFilter, kubeForwardChain},
471 {utiliptables.TableFilter, kubeNodePortChain},
472 {utiliptables.TableFilter, kubeProxyFirewallChain},
473 {utiliptables.TableFilter, kubeSourceRangesFirewallChain},
474 {utiliptables.TableFilter, kubeIPVSFilterChain},
475 {utiliptables.TableFilter, kubeIPVSOutFilterChain},
476 }
477
478
479 var ipsetInfo = []struct {
480 name string
481 setType utilipset.Type
482 comment string
483 }{
484 {kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment},
485 {kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment},
486 {kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment},
487 {kubeExternalIPLocalSet, utilipset.HashIPPort, kubeExternalIPLocalSetComment},
488 {kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment},
489 {kubeLoadBalancerFWSet, utilipset.HashIPPort, kubeLoadBalancerFWSetComment},
490 {kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment},
491 {kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
492 {kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
493 {kubeNodePortSetTCP, utilipset.BitmapPort, kubeNodePortSetTCPComment},
494 {kubeNodePortLocalSetTCP, utilipset.BitmapPort, kubeNodePortLocalSetTCPComment},
495 {kubeNodePortSetUDP, utilipset.BitmapPort, kubeNodePortSetUDPComment},
496 {kubeNodePortLocalSetUDP, utilipset.BitmapPort, kubeNodePortLocalSetUDPComment},
497 {kubeNodePortSetSCTP, utilipset.HashIPPort, kubeNodePortSetSCTPComment},
498 {kubeNodePortLocalSetSCTP, utilipset.HashIPPort, kubeNodePortLocalSetSCTPComment},
499 {kubeHealthCheckNodePortSet, utilipset.BitmapPort, kubeHealthCheckNodePortSetComment},
500 {kubeIPVSSet, utilipset.HashIP, kubeIPVSSetComment},
501 }
502
503
504
505
506
507
508 var ipsetWithIptablesChain = []struct {
509 name string
510 table utiliptables.Table
511 from string
512 to string
513 matchType string
514 protocolMatch string
515 }{
516 {kubeLoopBackIPSet, utiliptables.TableNAT, string(kubePostroutingChain), "MASQUERADE", "dst,dst,src", ""},
517 {kubeLoadBalancerSet, utiliptables.TableNAT, string(kubeServicesChain), string(kubeLoadBalancerChain), "dst,dst", ""},
518 {kubeLoadBalancerLocalSet, utiliptables.TableNAT, string(kubeLoadBalancerChain), "RETURN", "dst,dst", ""},
519 {kubeNodePortLocalSetTCP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst", utilipset.ProtocolTCP},
520 {kubeNodePortSetTCP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst", utilipset.ProtocolTCP},
521 {kubeNodePortLocalSetUDP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst", utilipset.ProtocolUDP},
522 {kubeNodePortSetUDP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst", utilipset.ProtocolUDP},
523 {kubeNodePortLocalSetSCTP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst,dst", utilipset.ProtocolSCTP},
524 {kubeNodePortSetSCTP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP},
525
526 {kubeLoadBalancerFWSet, utiliptables.TableFilter, string(kubeProxyFirewallChain), string(kubeSourceRangesFirewallChain), "dst,dst", ""},
527 {kubeLoadBalancerSourceCIDRSet, utiliptables.TableFilter, string(kubeSourceRangesFirewallChain), "RETURN", "dst,dst,src", ""},
528 {kubeLoadBalancerSourceIPSet, utiliptables.TableFilter, string(kubeSourceRangesFirewallChain), "RETURN", "dst,dst,src", ""},
529 }
530
531
532 type servicePortInfo struct {
533 *proxy.BaseServicePortInfo
534
535 nameString string
536 }
537
538
539 func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
540 svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo}
541
542
543 svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
544 svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
545 svcPort.nameString = svcPortName.String()
546
547 return svcPort
548 }
549
550
551
552 func getFirstColumn(r io.Reader) ([]string, error) {
553 b, err := io.ReadAll(r)
554 if err != nil {
555 return nil, err
556 }
557
558 lines := strings.Split(string(b), "\n")
559 words := make([]string, 0, len(lines))
560 for i := range lines {
561 fields := strings.Fields(lines[i])
562 if len(fields) > 0 {
563 words = append(words, fields[0])
564 }
565 }
566 return words, nil
567 }
568
569
570
571
572
573
574 func CanUseIPVSProxier(ipvs utilipvs.Interface, ipsetver IPSetVersioner, scheduler string) error {
575
576
577
578
579
580 if ipvs == nil {
581 return fmt.Errorf("Ipvs not supported by the kernel")
582 }
583
584
585 versionString, err := ipsetver.GetVersion()
586 if err != nil {
587 return fmt.Errorf("error getting ipset version, error: %v", err)
588 }
589 if !checkMinVersion(versionString) {
590 return fmt.Errorf("ipset version: %s is less than min required version: %s", versionString, MinIPSetCheckVersion)
591 }
592
593 if scheduler == "" {
594 scheduler = defaultScheduler
595 }
596
597
598 vservers, err := ipvs.GetVirtualServers()
599 if err != nil {
600 klog.ErrorS(err, "Can't read the ipvs")
601 return err
602 }
603 klog.V(5).InfoS("Virtual Servers", "count", len(vservers))
604 if len(vservers) > 0 {
605
606
607 for _, vs := range vservers {
608 if vs.Scheduler == scheduler {
609 klog.V(5).InfoS("VS exist, Skipping checks")
610 return nil
611 }
612 }
613 klog.V(5).InfoS("No existing VS uses the configured scheduler", "scheduler", scheduler)
614 }
615
616
617
618
619
620
621
622
623
624
625
626
627 vs := utilipvs.VirtualServer{
628 Address: netutils.ParseIPSloppy("198.51.100.0"),
629 Protocol: "TCP",
630 Port: 20000,
631 Scheduler: scheduler,
632 }
633 if err := ipvs.AddVirtualServer(&vs); err != nil {
634 klog.ErrorS(err, "Could not create dummy VS", "scheduler", scheduler)
635 return err
636 }
637
638
639 vservers, err = ipvs.GetVirtualServers()
640 if err != nil {
641 klog.ErrorS(err, "ipvs.GetVirtualServers")
642 return err
643 }
644 klog.V(5).InfoS("Virtual Servers after adding dummy", "count", len(vservers))
645 if len(vservers) == 0 {
646 klog.InfoS("Dummy VS not created", "scheduler", scheduler)
647 return fmt.Errorf("Ipvs not supported")
648 }
649 klog.V(5).InfoS("Dummy VS created", "vs", vs)
650
651 if err := ipvs.DeleteVirtualServer(&vs); err != nil {
652 klog.ErrorS(err, "Could not delete dummy VS")
653 return err
654 }
655
656 return nil
657 }
658
659
660
661 func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
662
663 for _, jc := range iptablesJumpChain {
664 args := []string{
665 "-m", "comment", "--comment", jc.comment,
666 "-j", string(jc.to),
667 }
668 if err := ipt.DeleteRule(jc.table, jc.from, args...); err != nil {
669 if !utiliptables.IsNotFoundError(err) {
670 klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
671 encounteredError = true
672 }
673 }
674 }
675
676
677 for _, ch := range iptablesCleanupChains {
678 if err := ipt.FlushChain(ch.table, ch.chain); err != nil {
679 if !utiliptables.IsNotFoundError(err) {
680 klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
681 encounteredError = true
682 }
683 }
684 }
685
686
687 for _, ch := range iptablesCleanupChains {
688 if err := ipt.DeleteChain(ch.table, ch.chain); err != nil {
689 if !utiliptables.IsNotFoundError(err) {
690 klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
691 encounteredError = true
692 }
693 }
694 }
695
696 return encounteredError
697 }
698
699
700 func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface) (encounteredError bool) {
701
702 if ipvs != nil {
703 err := ipvs.Flush()
704 if err != nil {
705 klog.ErrorS(err, "Error flushing ipvs rules")
706 encounteredError = true
707 }
708 }
709
710 nl := NewNetLinkHandle(false)
711 err := nl.DeleteDummyDevice(defaultDummyDevice)
712 if err != nil {
713 klog.ErrorS(err, "Error deleting dummy device created by ipvs proxier", "device", defaultDummyDevice)
714 encounteredError = true
715 }
716
717 encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError
718
719
720 for _, set := range ipsetInfo {
721 err = ipset.DestroySet(set.name)
722 if err != nil {
723 if !utilipset.IsNotFoundError(err) {
724 klog.ErrorS(err, "Error removing ipset", "ipset", set.name)
725 encounteredError = true
726 }
727 }
728 }
729 return encounteredError
730 }
731
732
733 func (proxier *Proxier) Sync() {
734 if proxier.healthzServer != nil {
735 proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
736 }
737 metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
738 proxier.syncRunner.Run()
739 }
740
741
742 func (proxier *Proxier) SyncLoop() {
743
744 if proxier.healthzServer != nil {
745 proxier.healthzServer.Updated(proxier.ipFamily)
746 }
747
748 metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
749 proxier.syncRunner.Loop(wait.NeverStop)
750 }
751
752 func (proxier *Proxier) setInitialized(value bool) {
753 var initialized int32
754 if value {
755 initialized = 1
756 }
757 atomic.StoreInt32(&proxier.initialized, initialized)
758 }
759
760 func (proxier *Proxier) isInitialized() bool {
761 return atomic.LoadInt32(&proxier.initialized) > 0
762 }
763
764
765 func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
766 proxier.OnServiceUpdate(nil, service)
767 }
768
769
770 func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
771 if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
772 proxier.Sync()
773 }
774 }
775
776
777 func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
778 proxier.OnServiceUpdate(service, nil)
779 }
780
781
782 func (proxier *Proxier) OnServiceSynced() {
783 proxier.mu.Lock()
784 proxier.servicesSynced = true
785 proxier.setInitialized(proxier.endpointSlicesSynced)
786 proxier.mu.Unlock()
787
788
789 proxier.syncProxyRules()
790 }
791
792
793
794 func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
795 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
796 proxier.Sync()
797 }
798 }
799
800
801
802 func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
803 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
804 proxier.Sync()
805 }
806 }
807
808
809
810 func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
811 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
812 proxier.Sync()
813 }
814 }
815
816
817
818 func (proxier *Proxier) OnEndpointSlicesSynced() {
819 proxier.mu.Lock()
820 proxier.endpointSlicesSynced = true
821 proxier.setInitialized(proxier.servicesSynced)
822 proxier.mu.Unlock()
823
824
825 proxier.syncProxyRules()
826 }
827
828
829
830 func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
831 if node.Name != proxier.hostname {
832 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
833 return
834 }
835
836 if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
837 return
838 }
839
840 proxier.mu.Lock()
841 proxier.nodeLabels = map[string]string{}
842 for k, v := range node.Labels {
843 proxier.nodeLabels[k] = v
844 }
845 proxier.mu.Unlock()
846 klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
847
848 proxier.Sync()
849 }
850
851
852
853 func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
854 if node.Name != proxier.hostname {
855 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
856 return
857 }
858
859 if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
860 return
861 }
862
863 proxier.mu.Lock()
864 proxier.nodeLabels = map[string]string{}
865 for k, v := range node.Labels {
866 proxier.nodeLabels[k] = v
867 }
868 proxier.mu.Unlock()
869 klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
870
871 proxier.Sync()
872 }
873
874
875
876 func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
877 if node.Name != proxier.hostname {
878 klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
879 return
880 }
881
882 proxier.mu.Lock()
883 proxier.nodeLabels = nil
884 proxier.mu.Unlock()
885
886 proxier.Sync()
887 }
888
889
890
891 func (proxier *Proxier) OnNodeSynced() {
892 }
893
894
895
896 func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
897
898
899 func (proxier *Proxier) syncProxyRules() {
900 proxier.mu.Lock()
901 defer proxier.mu.Unlock()
902
903
904 if !proxier.isInitialized() {
905 klog.V(2).InfoS("Not syncing ipvs rules until Services and Endpoints have been received from master")
906 return
907 }
908
909
910
911 defer func() {
912 proxier.initialSync = false
913 }()
914
915
916 start := time.Now()
917 defer func() {
918 metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
919 klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
920 }()
921
922
923
924
925 serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
926 endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
927
928 klog.V(3).InfoS("Syncing ipvs proxier rules")
929
930 proxier.serviceNoLocalEndpointsInternal = sets.New[string]()
931 proxier.serviceNoLocalEndpointsExternal = sets.New[string]()
932
933 proxier.lbNoNodeAccessIPPortProtocolEntries = make([]*utilipset.Entry, 0)
934
935
936
937
938
939 proxier.natChains.Reset()
940 proxier.natRules.Reset()
941 proxier.filterChains.Reset()
942 proxier.filterRules.Reset()
943
944
945 proxier.filterChains.Write("*filter")
946 proxier.natChains.Write("*nat")
947
948 proxier.createAndLinkKubeChain()
949
950
951 _, err := proxier.netlinkHandle.EnsureDummyDevice(defaultDummyDevice)
952 if err != nil {
953 klog.ErrorS(err, "Failed to create dummy interface", "interface", defaultDummyDevice)
954 return
955 }
956
957
958 for _, set := range proxier.ipsetList {
959 if err := ensureIPSet(set); err != nil {
960 return
961 }
962 set.resetEntries()
963 }
964
965
966 activeIPVSServices := sets.New[string]()
967
968 activeBindAddrs := sets.New[string]()
969
970 alreadyBoundAddrs, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice)
971 if err != nil {
972 klog.ErrorS(err, "Error listing addresses binded to dummy interface")
973 }
974
975 nodeAddressSet, err := proxier.netlinkHandle.GetAllLocalAddressesExcept(defaultDummyDevice)
976 if err != nil {
977 klog.ErrorS(err, "Error listing node addresses")
978 }
979
980 hasNodePort := false
981 for _, svc := range proxier.svcPortMap {
982 svcInfo, ok := svc.(*servicePortInfo)
983 if ok && svcInfo.NodePort() != 0 {
984 hasNodePort = true
985 break
986 }
987 }
988
989
990
991 var nodeIPs []net.IP
992 if hasNodePort {
993 if proxier.nodePortAddresses.MatchAll() {
994 for _, ipStr := range nodeAddressSet.UnsortedList() {
995 nodeIPs = append(nodeIPs, netutils.ParseIPSloppy(ipStr))
996 }
997 } else {
998 allNodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
999 if err != nil {
1000 klog.ErrorS(err, "Failed to get node IP address matching nodeport cidr")
1001 } else {
1002 for _, ip := range allNodeIPs {
1003 if !ip.IsLoopback() {
1004 nodeIPs = append(nodeIPs, ip)
1005 }
1006 }
1007 }
1008 }
1009 }
1010
1011
1012 for svcPortName, svcPort := range proxier.svcPortMap {
1013 svcInfo, ok := svcPort.(*servicePortInfo)
1014 if !ok {
1015 klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
1016 continue
1017 }
1018
1019 protocol := strings.ToLower(string(svcInfo.Protocol()))
1020
1021
1022 svcPortNameString := svcPortName.String()
1023
1024
1025 for _, e := range proxier.endpointsMap[svcPortName] {
1026 ep, ok := e.(*proxy.BaseEndpointInfo)
1027 if !ok {
1028 klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e)
1029 continue
1030 }
1031 if !ep.IsLocal() {
1032 continue
1033 }
1034 epIP := ep.IP()
1035 epPort := ep.Port()
1036
1037 if epIP == "" || epPort == 0 {
1038 continue
1039 }
1040 entry := &utilipset.Entry{
1041 IP: epIP,
1042 Port: epPort,
1043 Protocol: protocol,
1044 IP2: epIP,
1045 SetType: utilipset.HashIPPortIP,
1046 }
1047 if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
1048 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
1049 continue
1050 }
1051 proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
1052 }
1053
1054
1055
1056 entry := &utilipset.Entry{
1057 IP: svcInfo.ClusterIP().String(),
1058 Port: svcInfo.Port(),
1059 Protocol: protocol,
1060 SetType: utilipset.HashIPPort,
1061 }
1062
1063
1064 if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
1065 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
1066 continue
1067 }
1068 proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
1069
1070 serv := &utilipvs.VirtualServer{
1071 Address: svcInfo.ClusterIP(),
1072 Port: uint16(svcInfo.Port()),
1073 Protocol: string(svcInfo.Protocol()),
1074 Scheduler: proxier.ipvsScheduler,
1075 }
1076
1077 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1078 serv.Flags |= utilipvs.FlagPersistent
1079 serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
1080 }
1081
1082 if proxier.ipvsScheduler == "mh" {
1083 serv.Flags |= utilipvs.FlagSourceHash
1084 }
1085
1086 if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil {
1087 activeIPVSServices.Insert(serv.String())
1088 activeBindAddrs.Insert(serv.Address.String())
1089
1090
1091 internalNodeLocal := false
1092 if svcInfo.InternalPolicyLocal() {
1093 internalNodeLocal = true
1094 }
1095 if err := proxier.syncEndpoint(svcPortName, internalNodeLocal, serv); err != nil {
1096 klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
1097 }
1098 } else {
1099 klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
1100 }
1101
1102
1103 for _, externalIP := range svcInfo.ExternalIPs() {
1104
1105 entry := &utilipset.Entry{
1106 IP: externalIP.String(),
1107 Port: svcInfo.Port(),
1108 Protocol: protocol,
1109 SetType: utilipset.HashIPPort,
1110 }
1111
1112 if svcInfo.ExternalPolicyLocal() {
1113 if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
1114 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
1115 continue
1116 }
1117 proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
1118 } else {
1119
1120 if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
1121 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
1122 continue
1123 }
1124 proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
1125 }
1126
1127
1128 serv := &utilipvs.VirtualServer{
1129 Address: externalIP,
1130 Port: uint16(svcInfo.Port()),
1131 Protocol: string(svcInfo.Protocol()),
1132 Scheduler: proxier.ipvsScheduler,
1133 }
1134 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1135 serv.Flags |= utilipvs.FlagPersistent
1136 serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
1137 }
1138
1139 if proxier.ipvsScheduler == "mh" {
1140 serv.Flags |= utilipvs.FlagSourceHash
1141 }
1142
1143 shouldBind := !nodeAddressSet.Has(serv.Address.String())
1144 if err := proxier.syncService(svcPortNameString, serv, shouldBind, alreadyBoundAddrs); err == nil {
1145 activeIPVSServices.Insert(serv.String())
1146 if shouldBind {
1147 activeBindAddrs.Insert(serv.Address.String())
1148 }
1149 if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
1150 klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
1151 }
1152 } else {
1153 klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
1154 }
1155 }
1156
1157
1158 for _, ingress := range svcInfo.LoadBalancerVIPs() {
1159
1160 entry = &utilipset.Entry{
1161 IP: ingress.String(),
1162 Port: svcInfo.Port(),
1163 Protocol: protocol,
1164 SetType: utilipset.HashIPPort,
1165 }
1166
1167
1168
1169
1170 if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
1171 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
1172 continue
1173 }
1174 proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
1175
1176 if svcInfo.ExternalPolicyLocal() {
1177 if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
1178 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
1179 continue
1180 }
1181 proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
1182 }
1183 if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
1184
1185
1186
1187 if valid := proxier.ipsetList[kubeLoadBalancerFWSet].validateEntry(entry); !valid {
1188 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerFWSet].Name)
1189 continue
1190 }
1191 proxier.ipsetList[kubeLoadBalancerFWSet].activeEntries.Insert(entry.String())
1192 allowFromNode := false
1193 for _, cidr := range svcInfo.LoadBalancerSourceRanges() {
1194
1195 entry = &utilipset.Entry{
1196 IP: ingress.String(),
1197 Port: svcInfo.Port(),
1198 Protocol: protocol,
1199 Net: cidr.String(),
1200 SetType: utilipset.HashIPPortNet,
1201 }
1202
1203 if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
1204 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
1205 continue
1206 }
1207 proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
1208
1209 if cidr.Contains(proxier.nodeIP) {
1210 allowFromNode = true
1211 }
1212 }
1213
1214
1215
1216 if allowFromNode {
1217 entry = &utilipset.Entry{
1218 IP: ingress.String(),
1219 Port: svcInfo.Port(),
1220 Protocol: protocol,
1221 IP2: ingress.String(),
1222 SetType: utilipset.HashIPPortIP,
1223 }
1224
1225 if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
1226 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
1227 continue
1228 }
1229 proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
1230 } else {
1231
1232 proxier.lbNoNodeAccessIPPortProtocolEntries = append(proxier.lbNoNodeAccessIPPortProtocolEntries, entry)
1233
1234 }
1235 }
1236
1237 serv := &utilipvs.VirtualServer{
1238 Address: ingress,
1239 Port: uint16(svcInfo.Port()),
1240 Protocol: string(svcInfo.Protocol()),
1241 Scheduler: proxier.ipvsScheduler,
1242 }
1243 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1244 serv.Flags |= utilipvs.FlagPersistent
1245 serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
1246 }
1247
1248 if proxier.ipvsScheduler == "mh" {
1249 serv.Flags |= utilipvs.FlagSourceHash
1250 }
1251
1252 shouldBind := !nodeAddressSet.Has(serv.Address.String())
1253 if err := proxier.syncService(svcPortNameString, serv, shouldBind, alreadyBoundAddrs); err == nil {
1254 activeIPVSServices.Insert(serv.String())
1255 if shouldBind {
1256 activeBindAddrs.Insert(serv.Address.String())
1257 }
1258 if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
1259 klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
1260 }
1261 } else {
1262 klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
1263 }
1264 }
1265
1266 if svcInfo.NodePort() != 0 {
1267 if len(nodeIPs) == 0 {
1268
1269
1270 continue
1271 }
1272
1273
1274
1275
1276 var (
1277 nodePortSet *IPSet
1278 entries []*utilipset.Entry
1279 )
1280
1281 switch protocol {
1282 case utilipset.ProtocolTCP:
1283 nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
1284 entries = []*utilipset.Entry{{
1285
1286 Port: svcInfo.NodePort(),
1287 Protocol: protocol,
1288 SetType: utilipset.BitmapPort,
1289 }}
1290 case utilipset.ProtocolUDP:
1291 nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
1292 entries = []*utilipset.Entry{{
1293
1294 Port: svcInfo.NodePort(),
1295 Protocol: protocol,
1296 SetType: utilipset.BitmapPort,
1297 }}
1298 case utilipset.ProtocolSCTP:
1299 nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
1300
1301 entries = []*utilipset.Entry{}
1302 for _, nodeIP := range nodeIPs {
1303 entries = append(entries, &utilipset.Entry{
1304 IP: nodeIP.String(),
1305 Port: svcInfo.NodePort(),
1306 Protocol: protocol,
1307 SetType: utilipset.HashIPPort,
1308 })
1309 }
1310 default:
1311
1312 klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
1313 }
1314 if nodePortSet != nil {
1315 entryInvalidErr := false
1316 for _, entry := range entries {
1317 if valid := nodePortSet.validateEntry(entry); !valid {
1318 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortSet.Name)
1319 entryInvalidErr = true
1320 break
1321 }
1322 nodePortSet.activeEntries.Insert(entry.String())
1323 }
1324 if entryInvalidErr {
1325 continue
1326 }
1327 }
1328
1329
1330 if svcInfo.ExternalPolicyLocal() {
1331 var nodePortLocalSet *IPSet
1332 switch protocol {
1333 case utilipset.ProtocolTCP:
1334 nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP]
1335 case utilipset.ProtocolUDP:
1336 nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP]
1337 case utilipset.ProtocolSCTP:
1338 nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
1339 default:
1340
1341 klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
1342 }
1343 if nodePortLocalSet != nil {
1344 entryInvalidErr := false
1345 for _, entry := range entries {
1346 if valid := nodePortLocalSet.validateEntry(entry); !valid {
1347 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortLocalSet.Name)
1348 entryInvalidErr = true
1349 break
1350 }
1351 nodePortLocalSet.activeEntries.Insert(entry.String())
1352 }
1353 if entryInvalidErr {
1354 continue
1355 }
1356 }
1357 }
1358
1359
1360 for _, nodeIP := range nodeIPs {
1361
1362 serv := &utilipvs.VirtualServer{
1363 Address: nodeIP,
1364 Port: uint16(svcInfo.NodePort()),
1365 Protocol: string(svcInfo.Protocol()),
1366 Scheduler: proxier.ipvsScheduler,
1367 }
1368 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
1369 serv.Flags |= utilipvs.FlagPersistent
1370 serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
1371 }
1372
1373 if proxier.ipvsScheduler == "mh" {
1374 serv.Flags |= utilipvs.FlagSourceHash
1375 }
1376
1377 if err := proxier.syncService(svcPortNameString, serv, false, alreadyBoundAddrs); err == nil {
1378 activeIPVSServices.Insert(serv.String())
1379 if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
1380 klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
1381 }
1382 } else {
1383 klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
1384 }
1385 }
1386 }
1387
1388 if svcInfo.HealthCheckNodePort() != 0 {
1389 nodePortSet := proxier.ipsetList[kubeHealthCheckNodePortSet]
1390 entry := &utilipset.Entry{
1391
1392 Port: svcInfo.HealthCheckNodePort(),
1393 Protocol: "tcp",
1394 SetType: utilipset.BitmapPort,
1395 }
1396
1397 if valid := nodePortSet.validateEntry(entry); !valid {
1398 klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortSet.Name)
1399 continue
1400 }
1401 nodePortSet.activeEntries.Insert(entry.String())
1402 }
1403 }
1404
1405
1406 proxier.ipsetList[kubeIPVSSet].activeEntries = activeBindAddrs
1407
1408
1409 for _, set := range proxier.ipsetList {
1410 set.syncIPSetEntries()
1411 }
1412
1413
1414
1415 proxier.writeIptablesRules()
1416
1417
1418
1419 proxier.iptablesData.Reset()
1420 proxier.iptablesData.Write(proxier.natChains.Bytes())
1421 proxier.iptablesData.Write(proxier.natRules.Bytes())
1422 proxier.iptablesData.Write(proxier.filterChains.Bytes())
1423 proxier.iptablesData.Write(proxier.filterRules.Bytes())
1424
1425 klog.V(5).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
1426 err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
1427 if err != nil {
1428 if pErr, ok := err.(utiliptables.ParseError); ok {
1429 lines := utiliptables.ExtractLines(proxier.iptablesData.Bytes(), pErr.Line(), 3)
1430 klog.ErrorS(pErr, "Failed to execute iptables-restore", "rules", lines)
1431 } else {
1432 klog.ErrorS(err, "Failed to execute iptables-restore", "rules", proxier.iptablesData.Bytes())
1433 }
1434 metrics.IptablesRestoreFailuresTotal.Inc()
1435 return
1436 }
1437 for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
1438 for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
1439 latency := metrics.SinceInSeconds(lastChangeTriggerTime)
1440 metrics.NetworkProgrammingLatency.Observe(latency)
1441 klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
1442 }
1443 }
1444
1445
1446 superfluousAddresses := alreadyBoundAddrs.Difference(activeBindAddrs)
1447 if superfluousAddresses.Len() > 0 {
1448 klog.V(2).InfoS("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses)
1449 for adr := range superfluousAddresses {
1450 if err := proxier.netlinkHandle.UnbindAddress(adr, defaultDummyDevice); err != nil {
1451 klog.ErrorS(err, "UnbindAddress", "interface", defaultDummyDevice, "address", adr)
1452 }
1453 }
1454 }
1455
1456
1457
1458 currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
1459 appliedSvcs, err := proxier.ipvs.GetVirtualServers()
1460 if err == nil {
1461 for _, appliedSvc := range appliedSvcs {
1462 currentIPVSServices[appliedSvc.String()] = appliedSvc
1463 }
1464 } else {
1465 klog.ErrorS(err, "Failed to get ipvs service")
1466 }
1467 proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
1468
1469 if proxier.healthzServer != nil {
1470 proxier.healthzServer.Updated(proxier.ipFamily)
1471 }
1472 metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
1473
1474
1475
1476
1477 if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
1478 klog.ErrorS(err, "Error syncing healthcheck services")
1479 }
1480 if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
1481 klog.ErrorS(err, "Error syncing healthcheck endpoints")
1482 }
1483
1484 metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len()))
1485 metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
1486
1487
1488 conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
1489 }
1490
1491
1492
1493
1494 func (proxier *Proxier) writeIptablesRules() {
1495
1496
1497 loAddr := "127.0.0.0/8"
1498 if proxier.ipFamily == v1.IPv6Protocol {
1499 loAddr = "::1/128"
1500 }
1501 proxier.natRules.Write("-A", string(kubeServicesChain), "-s", loAddr, "-j", "RETURN")
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513 args := make([]string, 64)
1514
1515 for _, set := range ipsetWithIptablesChain {
1516 if _, find := proxier.ipsetList[set.name]; find && !proxier.ipsetList[set.name].isEmpty() {
1517 args = append(args[:0], "-A", set.from)
1518 if set.protocolMatch != "" {
1519 args = append(args, "-p", set.protocolMatch)
1520 }
1521 args = append(args,
1522 "-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(),
1523 "-m", "set", "--match-set", proxier.ipsetList[set.name].Name,
1524 set.matchType,
1525 )
1526 if set.table == utiliptables.TableFilter {
1527 proxier.filterRules.Write(args, "-j", set.to)
1528 } else {
1529 proxier.natRules.Write(args, "-j", set.to)
1530 }
1531 }
1532 }
1533
1534 if !proxier.ipsetList[kubeClusterIPSet].isEmpty() {
1535 args = append(args[:0],
1536 "-A", string(kubeServicesChain),
1537 "-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(),
1538 "-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name,
1539 )
1540 if proxier.masqueradeAll {
1541 proxier.natRules.Write(
1542 args, "dst,dst",
1543 "-j", string(kubeMarkMasqChain))
1544 } else if proxier.localDetector.IsImplemented() {
1545
1546
1547
1548
1549
1550 proxier.natRules.Write(
1551 args, "dst,dst",
1552 proxier.localDetector.IfNotLocal(),
1553 "-j", string(kubeMarkMasqChain))
1554 } else {
1555
1556
1557
1558
1559
1560
1561
1562 proxier.natRules.Write(
1563 args, "src,dst",
1564 "-j", string(kubeMarkMasqChain))
1565 }
1566 }
1567
1568
1569 externalIPRules := func(args []string) {
1570
1571
1572
1573
1574 externalTrafficOnlyArgs := append(args,
1575 "-m", "physdev", "!", "--physdev-is-in",
1576 "-m", "addrtype", "!", "--src-type", "LOCAL")
1577 proxier.natRules.Write(externalTrafficOnlyArgs, "-j", "ACCEPT")
1578 dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
1579
1580
1581 proxier.natRules.Write(dstLocalOnlyArgs, "-j", "ACCEPT")
1582 }
1583
1584 if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
1585
1586 args = append(args[:0],
1587 "-A", string(kubeServicesChain),
1588 "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
1589 "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name,
1590 "dst,dst",
1591 )
1592 proxier.natRules.Write(args, "-j", string(kubeMarkMasqChain))
1593 externalIPRules(args)
1594 }
1595
1596 if !proxier.ipsetList[kubeExternalIPLocalSet].isEmpty() {
1597 args = append(args[:0],
1598 "-A", string(kubeServicesChain),
1599 "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPLocalSet].getComment(),
1600 "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPLocalSet].Name,
1601 "dst,dst",
1602 )
1603 externalIPRules(args)
1604 }
1605
1606
1607 args = append(args[:0],
1608 "-A", string(kubeServicesChain),
1609 "-m", "addrtype", "--dst-type", "LOCAL",
1610 )
1611 proxier.natRules.Write(args, "-j", string(kubeNodePortChain))
1612
1613
1614 proxier.natRules.Write(
1615 "-A", string(kubeLoadBalancerChain),
1616 "-j", string(kubeMarkMasqChain),
1617 )
1618
1619
1620 proxier.filterRules.Write(
1621 "-A", string(kubeSourceRangesFirewallChain),
1622 "-j", "DROP",
1623 )
1624
1625
1626
1627 for _, entry := range proxier.lbNoNodeAccessIPPortProtocolEntries {
1628 proxier.filterRules.Write(
1629 "-A", string(kubeIPVSOutFilterChain),
1630 "-s", entry.IP,
1631 "-m", "ipvs", "--vaddr", entry.IP, "--vproto", entry.Protocol, "--vport", strconv.Itoa(entry.Port),
1632 "-j", "DROP",
1633 )
1634 }
1635
1636
1637
1638
1639 proxier.acceptIPVSTraffic()
1640
1641
1642
1643
1644 proxier.filterRules.Write(
1645 "-A", string(kubeForwardChain),
1646 "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
1647 "-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
1648 "-j", "ACCEPT",
1649 )
1650
1651
1652
1653 proxier.filterRules.Write(
1654 "-A", string(kubeForwardChain),
1655 "-m", "comment", "--comment", `"kubernetes forwarding conntrack rule"`,
1656 "-m", "conntrack",
1657 "--ctstate", "RELATED,ESTABLISHED",
1658 "-j", "ACCEPT",
1659 )
1660
1661
1662 proxier.filterRules.Write(
1663 "-A", string(kubeNodePortChain),
1664 "-m", "comment", "--comment", proxier.ipsetList[kubeHealthCheckNodePortSet].getComment(),
1665 "-m", "set", "--match-set", proxier.ipsetList[kubeHealthCheckNodePortSet].Name, "dst",
1666 "-j", "ACCEPT",
1667 )
1668
1669
1670
1671 proxier.filterRules.Write(
1672 "-A", string(kubeIPVSFilterChain),
1673 "-m", "set", "--match-set", proxier.ipsetList[kubeLoadBalancerSet].Name, "dst,dst", "-j", "RETURN")
1674 proxier.filterRules.Write(
1675 "-A", string(kubeIPVSFilterChain),
1676 "-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name, "dst,dst", "-j", "RETURN")
1677 proxier.filterRules.Write(
1678 "-A", string(kubeIPVSFilterChain),
1679 "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name, "dst,dst", "-j", "RETURN")
1680 proxier.filterRules.Write(
1681 "-A", string(kubeIPVSFilterChain),
1682 "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPLocalSet].Name, "dst,dst", "-j", "RETURN")
1683 proxier.filterRules.Write(
1684 "-A", string(kubeIPVSFilterChain),
1685 "-m", "set", "--match-set", proxier.ipsetList[kubeHealthCheckNodePortSet].Name, "dst", "-j", "RETURN")
1686 proxier.filterRules.Write(
1687 "-A", string(kubeIPVSFilterChain),
1688 "-m", "conntrack", "--ctstate", "NEW",
1689 "-m", "set", "--match-set", proxier.ipsetList[kubeIPVSSet].Name, "dst", "-j", "REJECT")
1690
1691
1692
1693
1694
1695 proxier.natRules.Write(
1696 "-A", string(kubePostroutingChain),
1697 "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
1698 "-j", "RETURN",
1699 )
1700
1701 proxier.natRules.Write(
1702 "-A", string(kubePostroutingChain),
1703
1704 "-j", "MARK", "--xor-mark", proxier.masqueradeMark,
1705 )
1706 masqRule := []string{
1707 "-A", string(kubePostroutingChain),
1708 "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
1709 "-j", "MASQUERADE",
1710 }
1711 if proxier.iptables.HasRandomFully() {
1712 masqRule = append(masqRule, "--random-fully")
1713 }
1714 proxier.natRules.Write(masqRule)
1715
1716
1717
1718
1719 proxier.natRules.Write(
1720 "-A", string(kubeMarkMasqChain),
1721 "-j", "MARK", "--or-mark", proxier.masqueradeMark,
1722 )
1723
1724
1725 proxier.filterRules.Write("COMMIT")
1726 proxier.natRules.Write("COMMIT")
1727 }
1728
1729 func (proxier *Proxier) acceptIPVSTraffic() {
1730 sets := []string{kubeClusterIPSet, kubeLoadBalancerSet}
1731 for _, set := range sets {
1732 var matchType string
1733 if !proxier.ipsetList[set].isEmpty() {
1734 switch proxier.ipsetList[set].SetType {
1735 case utilipset.BitmapPort:
1736 matchType = "dst"
1737 default:
1738 matchType = "dst,dst"
1739 }
1740 proxier.natRules.Write(
1741 "-A", string(kubeServicesChain),
1742 "-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType,
1743 "-j", "ACCEPT",
1744 )
1745 }
1746 }
1747 }
1748
1749
1750 func (proxier *Proxier) createAndLinkKubeChain() {
1751 for _, ch := range iptablesChains {
1752 if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
1753 klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
1754 return
1755 }
1756 if ch.table == utiliptables.TableNAT {
1757 proxier.natChains.Write(utiliptables.MakeChainLine(ch.chain))
1758 } else {
1759 proxier.filterChains.Write(utiliptables.MakeChainLine(ch.chain))
1760 }
1761 }
1762
1763 for _, jc := range iptablesJumpChain {
1764 args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)}
1765 if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil {
1766 klog.ErrorS(err, "Failed to ensure chain jumps", "table", jc.table, "srcChain", jc.from, "dstChain", jc.to)
1767 }
1768 }
1769
1770 }
1771
1772 func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, alreadyBoundAddrs sets.Set[string]) error {
1773 appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
1774 if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
1775 if appliedVirtualServer == nil {
1776
1777 klog.V(3).InfoS("Adding new service", "serviceName", svcName, "virtualServer", vs)
1778 if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
1779 klog.ErrorS(err, "Failed to add IPVS service", "serviceName", svcName)
1780 return err
1781 }
1782 } else {
1783
1784
1785 klog.V(3).InfoS("IPVS service was changed", "serviceName", svcName)
1786 if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
1787 klog.ErrorS(err, "Failed to update IPVS service")
1788 return err
1789 }
1790 }
1791 }
1792
1793
1794 if bindAddr {
1795
1796
1797 if alreadyBoundAddrs != nil && alreadyBoundAddrs.Has(vs.Address.String()) {
1798 return nil
1799 }
1800
1801 klog.V(4).InfoS("Bind address", "address", vs.Address)
1802 _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), defaultDummyDevice)
1803 if err != nil {
1804 klog.ErrorS(err, "Failed to bind service address to dummy device", "serviceName", svcName)
1805 return err
1806 }
1807 }
1808
1809 return nil
1810 }
1811
1812 func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
1813 appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
1814 if err != nil {
1815 klog.ErrorS(err, "Failed to get IPVS service")
1816 return err
1817 }
1818 if appliedVirtualServer == nil {
1819 return errors.New("IPVS virtual service does not exist")
1820 }
1821
1822
1823 curEndpoints := sets.New[string]()
1824 curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
1825 if err != nil {
1826 klog.ErrorS(err, "Failed to list IPVS destinations")
1827 return err
1828 }
1829 for _, des := range curDests {
1830 curEndpoints.Insert(des.String())
1831 }
1832
1833 endpoints := proxier.endpointsMap[svcPortName]
1834
1835
1836
1837
1838
1839 svcInfo, ok := proxier.svcPortMap[svcPortName]
1840 if !ok {
1841 klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName)
1842 } else {
1843 clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels)
1844 if onlyNodeLocalEndpoints {
1845 if len(localEndpoints) > 0 {
1846 endpoints = localEndpoints
1847 } else {
1848
1849
1850
1851
1852 endpoints = clusterEndpoints
1853
1854 if hasAnyEndpoints && svcInfo.InternalPolicyLocal() {
1855 proxier.serviceNoLocalEndpointsInternal.Insert(svcPortName.NamespacedName.String())
1856 }
1857
1858 if hasAnyEndpoints && svcInfo.ExternalPolicyLocal() {
1859 proxier.serviceNoLocalEndpointsExternal.Insert(svcPortName.NamespacedName.String())
1860 }
1861 }
1862 } else {
1863 endpoints = clusterEndpoints
1864 }
1865 }
1866
1867 newEndpoints := sets.New[string]()
1868 for _, epInfo := range endpoints {
1869 newEndpoints.Insert(epInfo.String())
1870 }
1871
1872
1873 for _, ep := range newEndpoints.UnsortedList() {
1874 ip, port, err := net.SplitHostPort(ep)
1875 if err != nil {
1876 klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep)
1877 continue
1878 }
1879 portNum, err := strconv.Atoi(port)
1880 if err != nil {
1881 klog.ErrorS(err, "Failed to parse endpoint port", "port", port)
1882 continue
1883 }
1884
1885 newDest := &utilipvs.RealServer{
1886 Address: netutils.ParseIPSloppy(ip),
1887 Port: uint16(portNum),
1888 Weight: 1,
1889 }
1890
1891 if curEndpoints.Has(ep) {
1892
1893
1894 if proxier.initialSync {
1895 for _, dest := range curDests {
1896 if dest.Weight != newDest.Weight {
1897 err = proxier.ipvs.UpdateRealServer(appliedVirtualServer, newDest)
1898 if err != nil {
1899 klog.ErrorS(err, "Failed to update destination", "newDest", newDest)
1900 continue
1901 }
1902 }
1903 }
1904 }
1905
1906 uniqueRS := GetUniqueRSName(vs, newDest)
1907 if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
1908 continue
1909 }
1910 klog.V(5).InfoS("new ep is in graceful delete list", "uniqueRealServer", uniqueRS)
1911 err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
1912 if err != nil {
1913 klog.ErrorS(err, "Failed to delete endpoint in gracefulDeleteQueue", "endpoint", ep)
1914 continue
1915 }
1916 }
1917 err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
1918 if err != nil {
1919 klog.ErrorS(err, "Failed to add destination", "newDest", newDest)
1920 continue
1921 }
1922 }
1923
1924
1925 for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
1926
1927 uniqueRS := vs.String() + "/" + ep
1928 if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
1929 continue
1930 }
1931 ip, port, err := net.SplitHostPort(ep)
1932 if err != nil {
1933 klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep)
1934 continue
1935 }
1936 portNum, err := strconv.Atoi(port)
1937 if err != nil {
1938 klog.ErrorS(err, "Failed to parse endpoint port", "port", port)
1939 continue
1940 }
1941
1942 delDest := &utilipvs.RealServer{
1943 Address: netutils.ParseIPSloppy(ip),
1944 Port: uint16(portNum),
1945 }
1946
1947 klog.V(5).InfoS("Using graceful delete", "uniqueRealServer", uniqueRS)
1948 err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
1949 if err != nil {
1950 klog.ErrorS(err, "Failed to delete destination", "uniqueRealServer", uniqueRS)
1951 continue
1952 }
1953 }
1954 return nil
1955 }
1956
1957 func (proxier *Proxier) cleanLegacyService(activeServices sets.Set[string], currentServices map[string]*utilipvs.VirtualServer) {
1958 for cs, svc := range currentServices {
1959 if proxier.isIPInExcludeCIDRs(svc.Address) {
1960 continue
1961 }
1962 if getIPFamily(svc.Address) != proxier.ipFamily {
1963
1964 continue
1965 }
1966 if !activeServices.Has(cs) {
1967 klog.V(4).InfoS("Delete service", "virtualServer", svc)
1968 if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
1969 klog.ErrorS(err, "Failed to delete service", "virtualServer", svc)
1970 }
1971 }
1972 }
1973 }
1974
1975 func (proxier *Proxier) isIPInExcludeCIDRs(ip net.IP) bool {
1976
1977 for _, excludedCIDR := range proxier.excludeCIDRs {
1978 if excludedCIDR.Contains(ip) {
1979 return true
1980 }
1981 }
1982 return false
1983 }
1984
1985 func getIPFamily(ip net.IP) v1.IPFamily {
1986 if netutils.IsIPv4(ip) {
1987 return v1.IPv4Protocol
1988 }
1989 return v1.IPv6Protocol
1990 }
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
View as plain text