...

Source file src/k8s.io/kubernetes/pkg/proxy/ipvs/proxier.go

Documentation: k8s.io/kubernetes/pkg/proxy/ipvs

     1  //go:build linux
     2  // +build linux
     3  
     4  /*
     5  Copyright 2017 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package ipvs
    21  
    22  import (
    23  	"bytes"
    24  	"errors"
    25  	"fmt"
    26  	"io"
    27  	"net"
    28  	"reflect"
    29  	"strconv"
    30  	"strings"
    31  	"sync"
    32  	"sync/atomic"
    33  	"time"
    34  
    35  	"k8s.io/klog/v2"
    36  	utilexec "k8s.io/utils/exec"
    37  	netutils "k8s.io/utils/net"
    38  
    39  	v1 "k8s.io/api/core/v1"
    40  	discovery "k8s.io/api/discovery/v1"
    41  	"k8s.io/apimachinery/pkg/types"
    42  	"k8s.io/apimachinery/pkg/util/sets"
    43  	"k8s.io/apimachinery/pkg/util/version"
    44  	"k8s.io/apimachinery/pkg/util/wait"
    45  	"k8s.io/client-go/tools/events"
    46  	utilsysctl "k8s.io/component-helpers/node/util/sysctl"
    47  	"k8s.io/kubernetes/pkg/proxy"
    48  	"k8s.io/kubernetes/pkg/proxy/conntrack"
    49  	"k8s.io/kubernetes/pkg/proxy/healthcheck"
    50  	utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset"
    51  	utilipvs "k8s.io/kubernetes/pkg/proxy/ipvs/util"
    52  	"k8s.io/kubernetes/pkg/proxy/metaproxier"
    53  	"k8s.io/kubernetes/pkg/proxy/metrics"
    54  	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
    55  	proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
    56  	"k8s.io/kubernetes/pkg/util/async"
    57  	utiliptables "k8s.io/kubernetes/pkg/util/iptables"
    58  	utilkernel "k8s.io/kubernetes/pkg/util/kernel"
    59  )
    60  
    61  const (
    62  	// kubeServicesChain is the services portal chain
    63  	kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
    64  
    65  	// kubeProxyFirewallChain is the kube-proxy firewall chain.
    66  	kubeProxyFirewallChain utiliptables.Chain = "KUBE-PROXY-FIREWALL"
    67  
    68  	// kubeSourceRangesFirewallChain is the firewall subchain for LoadBalancerSourceRanges.
    69  	kubeSourceRangesFirewallChain utiliptables.Chain = "KUBE-SOURCE-RANGES-FIREWALL"
    70  
    71  	// kubePostroutingChain is the kubernetes postrouting chain
    72  	kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
    73  
    74  	// kubeMarkMasqChain is the mark-for-masquerade chain
    75  	kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
    76  
    77  	// kubeNodePortChain is the kubernetes node port chain
    78  	kubeNodePortChain utiliptables.Chain = "KUBE-NODE-PORT"
    79  
    80  	// kubeForwardChain is the kubernetes forward chain
    81  	kubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
    82  
    83  	// kubeLoadBalancerChain is the kubernetes chain for loadbalancer type service
    84  	kubeLoadBalancerChain utiliptables.Chain = "KUBE-LOAD-BALANCER"
    85  
    86  	// kubeIPVSFilterChain filters external access to main netns
    87  	// https://github.com/kubernetes/kubernetes/issues/72236
    88  	kubeIPVSFilterChain utiliptables.Chain = "KUBE-IPVS-FILTER"
    89  
    90  	// kubeIPVSOutFilterChain filters access to load balancer services from node.
    91  	// https://github.com/kubernetes/kubernetes/issues/119656
    92  	kubeIPVSOutFilterChain utiliptables.Chain = "KUBE-IPVS-OUT-FILTER"
    93  
    94  	// defaultScheduler is the default ipvs scheduler algorithm - round robin.
    95  	defaultScheduler = "rr"
    96  
    97  	// defaultDummyDevice is the default dummy interface which ipvs service address will bind to it.
    98  	defaultDummyDevice = "kube-ipvs0"
    99  )
   100  
   101  // In IPVS proxy mode, the following flags need to be set
   102  const (
   103  	sysctlVSConnTrack             = "net/ipv4/vs/conntrack"
   104  	sysctlConnReuse               = "net/ipv4/vs/conn_reuse_mode"
   105  	sysctlExpireNoDestConn        = "net/ipv4/vs/expire_nodest_conn"
   106  	sysctlExpireQuiescentTemplate = "net/ipv4/vs/expire_quiescent_template"
   107  	sysctlForward                 = "net/ipv4/ip_forward"
   108  	sysctlArpIgnore               = "net/ipv4/conf/all/arp_ignore"
   109  	sysctlArpAnnounce             = "net/ipv4/conf/all/arp_announce"
   110  )
   111  
   112  // NewDualStackProxier returns a new Proxier for dual-stack operation
   113  func NewDualStackProxier(
   114  	ipt [2]utiliptables.Interface,
   115  	ipvs utilipvs.Interface,
   116  	ipset utilipset.Interface,
   117  	sysctl utilsysctl.Interface,
   118  	exec utilexec.Interface,
   119  	syncPeriod time.Duration,
   120  	minSyncPeriod time.Duration,
   121  	excludeCIDRs []string,
   122  	strictARP bool,
   123  	tcpTimeout time.Duration,
   124  	tcpFinTimeout time.Duration,
   125  	udpTimeout time.Duration,
   126  	masqueradeAll bool,
   127  	masqueradeBit int,
   128  	localDetectors [2]proxyutiliptables.LocalTrafficDetector,
   129  	hostname string,
   130  	nodeIPs map[v1.IPFamily]net.IP,
   131  	recorder events.EventRecorder,
   132  	healthzServer *healthcheck.ProxierHealthServer,
   133  	scheduler string,
   134  	nodePortAddresses []string,
   135  	initOnly bool,
   136  ) (proxy.Provider, error) {
   137  	// Create an ipv4 instance of the single-stack proxier
   138  	ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], ipvs, ipset, sysctl,
   139  		exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP,
   140  		tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
   141  		localDetectors[0], hostname, nodeIPs[v1.IPv4Protocol], recorder,
   142  		healthzServer, scheduler, nodePortAddresses, initOnly)
   143  	if err != nil {
   144  		return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
   145  	}
   146  
   147  	ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], ipvs, ipset, sysctl,
   148  		exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP,
   149  		tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
   150  		localDetectors[1], hostname, nodeIPs[v1.IPv6Protocol], recorder,
   151  		healthzServer, scheduler, nodePortAddresses, initOnly)
   152  	if err != nil {
   153  		return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
   154  	}
   155  	if initOnly {
   156  		return nil, nil
   157  	}
   158  
   159  	// Return a meta-proxier that dispatch calls between the two
   160  	// single-stack proxier instances
   161  	return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
   162  }
   163  
   164  // Proxier is an ipvs based proxy for connections between a localhost:lport
   165  // and services that provide the actual backends.
   166  type Proxier struct {
   167  	// the ipfamily on which this proxy is operating on.
   168  	ipFamily v1.IPFamily
   169  	// endpointsChanges and serviceChanges contains all changes to endpoints and
   170  	// services that happened since last syncProxyRules call. For a single object,
   171  	// changes are accumulated, i.e. previous is state from before all of them,
   172  	// current is state after applying all of those.
   173  	endpointsChanges *proxy.EndpointsChangeTracker
   174  	serviceChanges   *proxy.ServiceChangeTracker
   175  
   176  	mu           sync.Mutex // protects the following fields
   177  	svcPortMap   proxy.ServicePortMap
   178  	endpointsMap proxy.EndpointsMap
   179  	nodeLabels   map[string]string
   180  	// initialSync is a bool indicating if the proxier is syncing for the first time.
   181  	// It is set to true when a new proxier is initialized and then set to false on all
   182  	// future syncs.
   183  	// This lets us run specific logic that's required only during proxy startup.
   184  	// For eg: it enables us to update weights of existing destinations only on startup
   185  	// saving us the cost of querying and updating real servers during every sync.
   186  	initialSync bool
   187  	// endpointSlicesSynced, and servicesSynced are set to true when
   188  	// corresponding objects are synced after startup. This is used to avoid updating
   189  	// ipvs rules with some partial data after kube-proxy restart.
   190  	endpointSlicesSynced bool
   191  	servicesSynced       bool
   192  	initialized          int32
   193  	syncRunner           *async.BoundedFrequencyRunner // governs calls to syncProxyRules
   194  
   195  	// These are effectively const and do not need the mutex to be held.
   196  	syncPeriod    time.Duration
   197  	minSyncPeriod time.Duration
   198  	// Values are CIDR's to exclude when cleaning up IPVS rules.
   199  	excludeCIDRs []*net.IPNet
   200  	// Set to true to set sysctls arp_ignore and arp_announce
   201  	strictARP      bool
   202  	iptables       utiliptables.Interface
   203  	ipvs           utilipvs.Interface
   204  	ipset          utilipset.Interface
   205  	conntrack      conntrack.Interface
   206  	masqueradeAll  bool
   207  	masqueradeMark string
   208  	localDetector  proxyutiliptables.LocalTrafficDetector
   209  	hostname       string
   210  	nodeIP         net.IP
   211  	recorder       events.EventRecorder
   212  
   213  	serviceHealthServer healthcheck.ServiceHealthServer
   214  	healthzServer       *healthcheck.ProxierHealthServer
   215  
   216  	ipvsScheduler string
   217  	// The following buffers are used to reuse memory and avoid allocations
   218  	// that are significantly impacting performance.
   219  	iptablesData     *bytes.Buffer
   220  	filterChainsData *bytes.Buffer
   221  	natChains        proxyutil.LineBuffer
   222  	filterChains     proxyutil.LineBuffer
   223  	natRules         proxyutil.LineBuffer
   224  	filterRules      proxyutil.LineBuffer
   225  	// Added as a member to the struct to allow injection for testing.
   226  	netlinkHandle NetLinkHandle
   227  	// ipsetList is the list of ipsets that ipvs proxier used.
   228  	ipsetList map[string]*IPSet
   229  	// nodePortAddresses selects the interfaces where nodePort works.
   230  	nodePortAddresses *proxyutil.NodePortAddresses
   231  	// networkInterfacer defines an interface for several net library functions.
   232  	// Inject for test purpose.
   233  	networkInterfacer     proxyutil.NetworkInterfacer
   234  	gracefuldeleteManager *GracefulTerminationManager
   235  	// serviceNoLocalEndpointsInternal represents the set of services that couldn't be applied
   236  	// due to the absence of local endpoints when the internal traffic policy is "Local".
   237  	// It is used to publish the sync_proxy_rules_no_endpoints_total
   238  	// metric with the traffic_policy label set to "internal".
   239  	// A Set is used here since we end up calculating endpoint topology multiple times for the same Service
   240  	// if it has multiple ports but each Service should only be counted once.
   241  	serviceNoLocalEndpointsInternal sets.Set[string]
   242  	// serviceNoLocalEndpointsExternal represents the set of services that couldn't be applied
   243  	// due to the absence of any endpoints when the external traffic policy is "Local".
   244  	// It is used to publish the sync_proxy_rules_no_endpoints_total
   245  	// metric with the traffic_policy label set to "external".
   246  	// A Set is used here since we end up calculating endpoint topology multiple times for the same Service
   247  	// if it has multiple ports but each Service should only be counted once.
   248  	serviceNoLocalEndpointsExternal sets.Set[string]
   249  	// lbNoNodeAccessIPPortProtocolEntries represents the set of loadBalancers IP + Port + Protocol that should not be accessible from K8s nodes
   250  	// We cannot directly restrict LB access from node using LoadBalancerSourceRanges, we need to install
   251  	// additional iptables rules.
   252  	// (ref: https://github.com/kubernetes/kubernetes/issues/119656)
   253  	lbNoNodeAccessIPPortProtocolEntries []*utilipset.Entry
   254  }
   255  
   256  // Proxier implements proxy.Provider
   257  var _ proxy.Provider = &Proxier{}
   258  
   259  // NewProxier returns a new Proxier given an iptables and ipvs Interface instance.
   260  // Because of the iptables and ipvs logic, it is assumed that there is only a single Proxier active on a machine.
   261  // An error will be returned if it fails to update or acquire the initial lock.
   262  // Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and
   263  // will not terminate if a particular iptables or ipvs call fails.
   264  func NewProxier(ipFamily v1.IPFamily,
   265  	ipt utiliptables.Interface,
   266  	ipvs utilipvs.Interface,
   267  	ipset utilipset.Interface,
   268  	sysctl utilsysctl.Interface,
   269  	exec utilexec.Interface,
   270  	syncPeriod time.Duration,
   271  	minSyncPeriod time.Duration,
   272  	excludeCIDRs []string,
   273  	strictARP bool,
   274  	tcpTimeout time.Duration,
   275  	tcpFinTimeout time.Duration,
   276  	udpTimeout time.Duration,
   277  	masqueradeAll bool,
   278  	masqueradeBit int,
   279  	localDetector proxyutiliptables.LocalTrafficDetector,
   280  	hostname string,
   281  	nodeIP net.IP,
   282  	recorder events.EventRecorder,
   283  	healthzServer *healthcheck.ProxierHealthServer,
   284  	scheduler string,
   285  	nodePortAddressStrings []string,
   286  	initOnly bool,
   287  ) (*Proxier, error) {
   288  	// Set the conntrack sysctl we need for
   289  	if err := proxyutil.EnsureSysctl(sysctl, sysctlVSConnTrack, 1); err != nil {
   290  		return nil, err
   291  	}
   292  
   293  	kernelVersion, err := utilkernel.GetVersion()
   294  	if err != nil {
   295  		return nil, fmt.Errorf("failed to get kernel version: %w", err)
   296  	}
   297  
   298  	if kernelVersion.LessThan(version.MustParseGeneric(utilkernel.IPVSConnReuseModeMinSupportedKernelVersion)) {
   299  		klog.ErrorS(nil, "Can't set sysctl, kernel version doesn't satisfy minimum version requirements", "sysctl", sysctlConnReuse, "minimumKernelVersion", utilkernel.IPVSConnReuseModeMinSupportedKernelVersion)
   300  	} else if kernelVersion.AtLeast(version.MustParseGeneric(utilkernel.IPVSConnReuseModeFixedKernelVersion)) {
   301  		// https://github.com/kubernetes/kubernetes/issues/93297
   302  		klog.V(2).InfoS("Left as-is", "sysctl", sysctlConnReuse)
   303  	} else {
   304  		// Set the connection reuse mode
   305  		if err := proxyutil.EnsureSysctl(sysctl, sysctlConnReuse, 0); err != nil {
   306  			return nil, err
   307  		}
   308  	}
   309  
   310  	// Set the expire_nodest_conn sysctl we need for
   311  	if err := proxyutil.EnsureSysctl(sysctl, sysctlExpireNoDestConn, 1); err != nil {
   312  		return nil, err
   313  	}
   314  
   315  	// Set the expire_quiescent_template sysctl we need for
   316  	if err := proxyutil.EnsureSysctl(sysctl, sysctlExpireQuiescentTemplate, 1); err != nil {
   317  		return nil, err
   318  	}
   319  
   320  	// Set the ip_forward sysctl we need for
   321  	if err := proxyutil.EnsureSysctl(sysctl, sysctlForward, 1); err != nil {
   322  		return nil, err
   323  	}
   324  
   325  	if strictARP {
   326  		// Set the arp_ignore sysctl we need for
   327  		if err := proxyutil.EnsureSysctl(sysctl, sysctlArpIgnore, 1); err != nil {
   328  			return nil, err
   329  		}
   330  
   331  		// Set the arp_announce sysctl we need for
   332  		if err := proxyutil.EnsureSysctl(sysctl, sysctlArpAnnounce, 2); err != nil {
   333  			return nil, err
   334  		}
   335  	}
   336  
   337  	// Configure IPVS timeouts if any one of the timeout parameters have been set.
   338  	// This is the equivalent to running ipvsadm --set, a value of 0 indicates the
   339  	// current system timeout should be preserved
   340  	if tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 {
   341  		if err := ipvs.ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout); err != nil {
   342  			klog.ErrorS(err, "Failed to configure IPVS timeouts")
   343  		}
   344  	}
   345  
   346  	if initOnly {
   347  		klog.InfoS("System initialized and --init-only specified")
   348  		return nil, nil
   349  	}
   350  
   351  	// Generate the masquerade mark to use for SNAT rules.
   352  	masqueradeValue := 1 << uint(masqueradeBit)
   353  	masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
   354  
   355  	klog.V(2).InfoS("Record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily)
   356  
   357  	if len(scheduler) == 0 {
   358  		klog.InfoS("IPVS scheduler not specified, use rr by default")
   359  		scheduler = defaultScheduler
   360  	}
   361  
   362  	nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings, nil)
   363  
   364  	serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
   365  
   366  	// excludeCIDRs has been validated before, here we just parse it to IPNet list
   367  	parsedExcludeCIDRs, _ := netutils.ParseCIDRs(excludeCIDRs)
   368  
   369  	proxier := &Proxier{
   370  		ipFamily:              ipFamily,
   371  		svcPortMap:            make(proxy.ServicePortMap),
   372  		serviceChanges:        proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
   373  		endpointsMap:          make(proxy.EndpointsMap),
   374  		endpointsChanges:      proxy.NewEndpointsChangeTracker(hostname, nil, ipFamily, recorder, nil),
   375  		initialSync:           true,
   376  		syncPeriod:            syncPeriod,
   377  		minSyncPeriod:         minSyncPeriod,
   378  		excludeCIDRs:          parsedExcludeCIDRs,
   379  		iptables:              ipt,
   380  		masqueradeAll:         masqueradeAll,
   381  		masqueradeMark:        masqueradeMark,
   382  		conntrack:             conntrack.NewExec(exec),
   383  		localDetector:         localDetector,
   384  		hostname:              hostname,
   385  		nodeIP:                nodeIP,
   386  		recorder:              recorder,
   387  		serviceHealthServer:   serviceHealthServer,
   388  		healthzServer:         healthzServer,
   389  		ipvs:                  ipvs,
   390  		ipvsScheduler:         scheduler,
   391  		iptablesData:          bytes.NewBuffer(nil),
   392  		filterChainsData:      bytes.NewBuffer(nil),
   393  		natChains:             proxyutil.NewLineBuffer(),
   394  		natRules:              proxyutil.NewLineBuffer(),
   395  		filterChains:          proxyutil.NewLineBuffer(),
   396  		filterRules:           proxyutil.NewLineBuffer(),
   397  		netlinkHandle:         NewNetLinkHandle(ipFamily == v1.IPv6Protocol),
   398  		ipset:                 ipset,
   399  		nodePortAddresses:     nodePortAddresses,
   400  		networkInterfacer:     proxyutil.RealNetwork{},
   401  		gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
   402  	}
   403  	// initialize ipsetList with all sets we needed
   404  	proxier.ipsetList = make(map[string]*IPSet)
   405  	for _, is := range ipsetInfo {
   406  		proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, (ipFamily == v1.IPv6Protocol), is.comment)
   407  	}
   408  	burstSyncs := 2
   409  	klog.V(2).InfoS("ipvs sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
   410  	proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
   411  	proxier.gracefuldeleteManager.Run()
   412  	return proxier, nil
   413  }
   414  
   415  func filterCIDRs(wantIPv6 bool, cidrs []string) []string {
   416  	var filteredCIDRs []string
   417  	for _, cidr := range cidrs {
   418  		if netutils.IsIPv6CIDRString(cidr) == wantIPv6 {
   419  			filteredCIDRs = append(filteredCIDRs, cidr)
   420  		}
   421  	}
   422  	return filteredCIDRs
   423  }
   424  
   425  // iptablesJumpChain is tables of iptables chains that ipvs proxier used to install iptables or cleanup iptables.
   426  // `to` is the iptables chain we want to operate.
   427  // `from` is the source iptables chain
   428  var iptablesJumpChain = []struct {
   429  	table   utiliptables.Table
   430  	from    utiliptables.Chain
   431  	to      utiliptables.Chain
   432  	comment string
   433  }{
   434  	{utiliptables.TableNAT, utiliptables.ChainOutput, kubeServicesChain, "kubernetes service portals"},
   435  	{utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"},
   436  	{utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"},
   437  	{utiliptables.TableFilter, utiliptables.ChainForward, kubeForwardChain, "kubernetes forwarding rules"},
   438  	{utiliptables.TableFilter, utiliptables.ChainInput, kubeNodePortChain, "kubernetes health check rules"},
   439  	{utiliptables.TableFilter, utiliptables.ChainInput, kubeProxyFirewallChain, "kube-proxy firewall rules"},
   440  	{utiliptables.TableFilter, utiliptables.ChainForward, kubeProxyFirewallChain, "kube-proxy firewall rules"},
   441  	{utiliptables.TableFilter, utiliptables.ChainInput, kubeIPVSFilterChain, "kubernetes ipvs access filter"},
   442  	{utiliptables.TableFilter, utiliptables.ChainOutput, kubeIPVSOutFilterChain, "kubernetes ipvs access filter"},
   443  }
   444  
   445  var iptablesChains = []struct {
   446  	table utiliptables.Table
   447  	chain utiliptables.Chain
   448  }{
   449  	{utiliptables.TableNAT, kubeServicesChain},
   450  	{utiliptables.TableNAT, kubePostroutingChain},
   451  	{utiliptables.TableNAT, kubeNodePortChain},
   452  	{utiliptables.TableNAT, kubeLoadBalancerChain},
   453  	{utiliptables.TableNAT, kubeMarkMasqChain},
   454  	{utiliptables.TableFilter, kubeForwardChain},
   455  	{utiliptables.TableFilter, kubeNodePortChain},
   456  	{utiliptables.TableFilter, kubeProxyFirewallChain},
   457  	{utiliptables.TableFilter, kubeSourceRangesFirewallChain},
   458  	{utiliptables.TableFilter, kubeIPVSFilterChain},
   459  	{utiliptables.TableFilter, kubeIPVSOutFilterChain},
   460  }
   461  
   462  var iptablesCleanupChains = []struct {
   463  	table utiliptables.Table
   464  	chain utiliptables.Chain
   465  }{
   466  	{utiliptables.TableNAT, kubeServicesChain},
   467  	{utiliptables.TableNAT, kubePostroutingChain},
   468  	{utiliptables.TableNAT, kubeNodePortChain},
   469  	{utiliptables.TableNAT, kubeLoadBalancerChain},
   470  	{utiliptables.TableFilter, kubeForwardChain},
   471  	{utiliptables.TableFilter, kubeNodePortChain},
   472  	{utiliptables.TableFilter, kubeProxyFirewallChain},
   473  	{utiliptables.TableFilter, kubeSourceRangesFirewallChain},
   474  	{utiliptables.TableFilter, kubeIPVSFilterChain},
   475  	{utiliptables.TableFilter, kubeIPVSOutFilterChain},
   476  }
   477  
   478  // ipsetInfo is all ipset we needed in ipvs proxier
   479  var ipsetInfo = []struct {
   480  	name    string
   481  	setType utilipset.Type
   482  	comment string
   483  }{
   484  	{kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment},
   485  	{kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment},
   486  	{kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment},
   487  	{kubeExternalIPLocalSet, utilipset.HashIPPort, kubeExternalIPLocalSetComment},
   488  	{kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment},
   489  	{kubeLoadBalancerFWSet, utilipset.HashIPPort, kubeLoadBalancerFWSetComment},
   490  	{kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment},
   491  	{kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
   492  	{kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
   493  	{kubeNodePortSetTCP, utilipset.BitmapPort, kubeNodePortSetTCPComment},
   494  	{kubeNodePortLocalSetTCP, utilipset.BitmapPort, kubeNodePortLocalSetTCPComment},
   495  	{kubeNodePortSetUDP, utilipset.BitmapPort, kubeNodePortSetUDPComment},
   496  	{kubeNodePortLocalSetUDP, utilipset.BitmapPort, kubeNodePortLocalSetUDPComment},
   497  	{kubeNodePortSetSCTP, utilipset.HashIPPort, kubeNodePortSetSCTPComment},
   498  	{kubeNodePortLocalSetSCTP, utilipset.HashIPPort, kubeNodePortLocalSetSCTPComment},
   499  	{kubeHealthCheckNodePortSet, utilipset.BitmapPort, kubeHealthCheckNodePortSetComment},
   500  	{kubeIPVSSet, utilipset.HashIP, kubeIPVSSetComment},
   501  }
   502  
   503  // ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to
   504  // `iptables -t nat -A <from> -m set --match-set <name> <matchType> -j <to>`
   505  // example: iptables -t nat -A KUBE-SERVICES -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-NODE-PORT
   506  // ipsets with other match rules will be created Individually.
   507  // Note: kubeNodePortLocalSetTCP must be prior to kubeNodePortSetTCP, the same for UDP.
   508  var ipsetWithIptablesChain = []struct {
   509  	name          string
   510  	table         utiliptables.Table
   511  	from          string
   512  	to            string
   513  	matchType     string
   514  	protocolMatch string
   515  }{
   516  	{kubeLoopBackIPSet, utiliptables.TableNAT, string(kubePostroutingChain), "MASQUERADE", "dst,dst,src", ""},
   517  	{kubeLoadBalancerSet, utiliptables.TableNAT, string(kubeServicesChain), string(kubeLoadBalancerChain), "dst,dst", ""},
   518  	{kubeLoadBalancerLocalSet, utiliptables.TableNAT, string(kubeLoadBalancerChain), "RETURN", "dst,dst", ""},
   519  	{kubeNodePortLocalSetTCP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst", utilipset.ProtocolTCP},
   520  	{kubeNodePortSetTCP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst", utilipset.ProtocolTCP},
   521  	{kubeNodePortLocalSetUDP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst", utilipset.ProtocolUDP},
   522  	{kubeNodePortSetUDP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst", utilipset.ProtocolUDP},
   523  	{kubeNodePortLocalSetSCTP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst,dst", utilipset.ProtocolSCTP},
   524  	{kubeNodePortSetSCTP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP},
   525  
   526  	{kubeLoadBalancerFWSet, utiliptables.TableFilter, string(kubeProxyFirewallChain), string(kubeSourceRangesFirewallChain), "dst,dst", ""},
   527  	{kubeLoadBalancerSourceCIDRSet, utiliptables.TableFilter, string(kubeSourceRangesFirewallChain), "RETURN", "dst,dst,src", ""},
   528  	{kubeLoadBalancerSourceIPSet, utiliptables.TableFilter, string(kubeSourceRangesFirewallChain), "RETURN", "dst,dst,src", ""},
   529  }
   530  
   531  // internal struct for string service information
   532  type servicePortInfo struct {
   533  	*proxy.BaseServicePortInfo
   534  	// The following fields are computed and stored for performance reasons.
   535  	nameString string
   536  }
   537  
   538  // returns a new proxy.ServicePort which abstracts a serviceInfo
   539  func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
   540  	svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo}
   541  
   542  	// Store the following for performance reasons.
   543  	svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
   544  	svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
   545  	svcPort.nameString = svcPortName.String()
   546  
   547  	return svcPort
   548  }
   549  
   550  // getFirstColumn reads all the content from r into memory and return a
   551  // slice which consists of the first word from each line.
   552  func getFirstColumn(r io.Reader) ([]string, error) {
   553  	b, err := io.ReadAll(r)
   554  	if err != nil {
   555  		return nil, err
   556  	}
   557  
   558  	lines := strings.Split(string(b), "\n")
   559  	words := make([]string, 0, len(lines))
   560  	for i := range lines {
   561  		fields := strings.Fields(lines[i])
   562  		if len(fields) > 0 {
   563  			words = append(words, fields[0])
   564  		}
   565  	}
   566  	return words, nil
   567  }
   568  
   569  // CanUseIPVSProxier checks if we can use the ipvs Proxier.
   570  // The ipset version and the scheduler are checked. If any virtual servers (VS)
   571  // already exist with the configured scheduler, we just return. Otherwise
   572  // we check if a dummy VS can be configured with the configured scheduler.
   573  // Kernel modules will be loaded automatically if necessary.
   574  func CanUseIPVSProxier(ipvs utilipvs.Interface, ipsetver IPSetVersioner, scheduler string) error {
   575  	// BUG: https://github.com/moby/ipvs/issues/27
   576  	// If ipvs is not compiled into the kernel no error is returned and handle==nil.
   577  	// This in turn causes ipvs.GetVirtualServers and ipvs.AddVirtualServer
   578  	// to return ok (err==nil). If/when this bug is fixed parameter "ipvs" will be nil
   579  	// if ipvs is not supported by the kernel. Until then a re-read work-around is used.
   580  	if ipvs == nil {
   581  		return fmt.Errorf("Ipvs not supported by the kernel")
   582  	}
   583  
   584  	// Check ipset version
   585  	versionString, err := ipsetver.GetVersion()
   586  	if err != nil {
   587  		return fmt.Errorf("error getting ipset version, error: %v", err)
   588  	}
   589  	if !checkMinVersion(versionString) {
   590  		return fmt.Errorf("ipset version: %s is less than min required version: %s", versionString, MinIPSetCheckVersion)
   591  	}
   592  
   593  	if scheduler == "" {
   594  		scheduler = defaultScheduler
   595  	}
   596  
   597  	// If any virtual server (VS) using the scheduler exist we skip the checks.
   598  	vservers, err := ipvs.GetVirtualServers()
   599  	if err != nil {
   600  		klog.ErrorS(err, "Can't read the ipvs")
   601  		return err
   602  	}
   603  	klog.V(5).InfoS("Virtual Servers", "count", len(vservers))
   604  	if len(vservers) > 0 {
   605  		// This is most likely a kube-proxy re-start. We know that ipvs works
   606  		// and if any VS uses the configured scheduler, we are done.
   607  		for _, vs := range vservers {
   608  			if vs.Scheduler == scheduler {
   609  				klog.V(5).InfoS("VS exist, Skipping checks")
   610  				return nil
   611  			}
   612  		}
   613  		klog.V(5).InfoS("No existing VS uses the configured scheduler", "scheduler", scheduler)
   614  	}
   615  
   616  	// Try to insert a dummy VS with the passed scheduler.
   617  	// We should use a VIP address that is not used on the node.
   618  	// An address "198.51.100.0" from the TEST-NET-2 rage in https://datatracker.ietf.org/doc/html/rfc5737
   619  	// is used. These addresses are reserved for documentation. If the user is using
   620  	// this address for a VS anyway we *will* mess up, but that would be an invalid configuration.
   621  	// If the user have configured the address to an interface on the node (but not a VS)
   622  	// then traffic will temporary be routed to ipvs during the probe and dropped.
   623  	// The later case is also and invalid configuration, but the traffic impact will be minor.
   624  	// This should not be a problem if users honors reserved addresses, but cut/paste
   625  	// from documentation is not unheard of, so the restriction to not use the TEST-NET-2 range
   626  	// must be documented.
   627  	vs := utilipvs.VirtualServer{
   628  		Address:   netutils.ParseIPSloppy("198.51.100.0"),
   629  		Protocol:  "TCP",
   630  		Port:      20000,
   631  		Scheduler: scheduler,
   632  	}
   633  	if err := ipvs.AddVirtualServer(&vs); err != nil {
   634  		klog.ErrorS(err, "Could not create dummy VS", "scheduler", scheduler)
   635  		return err
   636  	}
   637  
   638  	// To overcome the BUG described above we check that the VS is *really* added.
   639  	vservers, err = ipvs.GetVirtualServers()
   640  	if err != nil {
   641  		klog.ErrorS(err, "ipvs.GetVirtualServers")
   642  		return err
   643  	}
   644  	klog.V(5).InfoS("Virtual Servers after adding dummy", "count", len(vservers))
   645  	if len(vservers) == 0 {
   646  		klog.InfoS("Dummy VS not created", "scheduler", scheduler)
   647  		return fmt.Errorf("Ipvs not supported") // This is a BUG work-around
   648  	}
   649  	klog.V(5).InfoS("Dummy VS created", "vs", vs)
   650  
   651  	if err := ipvs.DeleteVirtualServer(&vs); err != nil {
   652  		klog.ErrorS(err, "Could not delete dummy VS")
   653  		return err
   654  	}
   655  
   656  	return nil
   657  }
   658  
   659  // CleanupIptablesLeftovers removes all iptables rules and chains created by the Proxier
   660  // It returns true if an error was encountered. Errors are logged.
   661  func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
   662  	// Unlink the iptables chains created by ipvs Proxier
   663  	for _, jc := range iptablesJumpChain {
   664  		args := []string{
   665  			"-m", "comment", "--comment", jc.comment,
   666  			"-j", string(jc.to),
   667  		}
   668  		if err := ipt.DeleteRule(jc.table, jc.from, args...); err != nil {
   669  			if !utiliptables.IsNotFoundError(err) {
   670  				klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
   671  				encounteredError = true
   672  			}
   673  		}
   674  	}
   675  
   676  	// Flush and remove all of our chains. Flushing all chains before removing them also removes all links between chains first.
   677  	for _, ch := range iptablesCleanupChains {
   678  		if err := ipt.FlushChain(ch.table, ch.chain); err != nil {
   679  			if !utiliptables.IsNotFoundError(err) {
   680  				klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
   681  				encounteredError = true
   682  			}
   683  		}
   684  	}
   685  
   686  	// Remove all of our chains.
   687  	for _, ch := range iptablesCleanupChains {
   688  		if err := ipt.DeleteChain(ch.table, ch.chain); err != nil {
   689  			if !utiliptables.IsNotFoundError(err) {
   690  				klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
   691  				encounteredError = true
   692  			}
   693  		}
   694  	}
   695  
   696  	return encounteredError
   697  }
   698  
   699  // CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier.
   700  func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface) (encounteredError bool) {
   701  	// Clear all ipvs rules
   702  	if ipvs != nil {
   703  		err := ipvs.Flush()
   704  		if err != nil {
   705  			klog.ErrorS(err, "Error flushing ipvs rules")
   706  			encounteredError = true
   707  		}
   708  	}
   709  	// Delete dummy interface created by ipvs Proxier.
   710  	nl := NewNetLinkHandle(false)
   711  	err := nl.DeleteDummyDevice(defaultDummyDevice)
   712  	if err != nil {
   713  		klog.ErrorS(err, "Error deleting dummy device created by ipvs proxier", "device", defaultDummyDevice)
   714  		encounteredError = true
   715  	}
   716  	// Clear iptables created by ipvs Proxier.
   717  	encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError
   718  	// Destroy ip sets created by ipvs Proxier.  We should call it after cleaning up
   719  	// iptables since we can NOT delete ip set which is still referenced by iptables.
   720  	for _, set := range ipsetInfo {
   721  		err = ipset.DestroySet(set.name)
   722  		if err != nil {
   723  			if !utilipset.IsNotFoundError(err) {
   724  				klog.ErrorS(err, "Error removing ipset", "ipset", set.name)
   725  				encounteredError = true
   726  			}
   727  		}
   728  	}
   729  	return encounteredError
   730  }
   731  
   732  // Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible.
   733  func (proxier *Proxier) Sync() {
   734  	if proxier.healthzServer != nil {
   735  		proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
   736  	}
   737  	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
   738  	proxier.syncRunner.Run()
   739  }
   740  
   741  // SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
   742  func (proxier *Proxier) SyncLoop() {
   743  	// Update healthz timestamp at beginning in case Sync() never succeeds.
   744  	if proxier.healthzServer != nil {
   745  		proxier.healthzServer.Updated(proxier.ipFamily)
   746  	}
   747  	// synthesize "last change queued" time as the informers are syncing.
   748  	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
   749  	proxier.syncRunner.Loop(wait.NeverStop)
   750  }
   751  
   752  func (proxier *Proxier) setInitialized(value bool) {
   753  	var initialized int32
   754  	if value {
   755  		initialized = 1
   756  	}
   757  	atomic.StoreInt32(&proxier.initialized, initialized)
   758  }
   759  
   760  func (proxier *Proxier) isInitialized() bool {
   761  	return atomic.LoadInt32(&proxier.initialized) > 0
   762  }
   763  
   764  // OnServiceAdd is called whenever creation of new service object is observed.
   765  func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
   766  	proxier.OnServiceUpdate(nil, service)
   767  }
   768  
   769  // OnServiceUpdate is called whenever modification of an existing service object is observed.
   770  func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
   771  	if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
   772  		proxier.Sync()
   773  	}
   774  }
   775  
   776  // OnServiceDelete is called whenever deletion of an existing service object is observed.
   777  func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
   778  	proxier.OnServiceUpdate(service, nil)
   779  }
   780  
   781  // OnServiceSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
   782  func (proxier *Proxier) OnServiceSynced() {
   783  	proxier.mu.Lock()
   784  	proxier.servicesSynced = true
   785  	proxier.setInitialized(proxier.endpointSlicesSynced)
   786  	proxier.mu.Unlock()
   787  
   788  	// Sync unconditionally - this is called once per lifetime.
   789  	proxier.syncProxyRules()
   790  }
   791  
   792  // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
   793  // is observed.
   794  func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
   795  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
   796  		proxier.Sync()
   797  	}
   798  }
   799  
   800  // OnEndpointSliceUpdate is called whenever modification of an existing endpoint
   801  // slice object is observed.
   802  func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
   803  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
   804  		proxier.Sync()
   805  	}
   806  }
   807  
   808  // OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
   809  // object is observed.
   810  func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
   811  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
   812  		proxier.Sync()
   813  	}
   814  }
   815  
   816  // OnEndpointSlicesSynced is called once all the initial event handlers were
   817  // called and the state is fully propagated to local cache.
   818  func (proxier *Proxier) OnEndpointSlicesSynced() {
   819  	proxier.mu.Lock()
   820  	proxier.endpointSlicesSynced = true
   821  	proxier.setInitialized(proxier.servicesSynced)
   822  	proxier.mu.Unlock()
   823  
   824  	// Sync unconditionally - this is called once per lifetime.
   825  	proxier.syncProxyRules()
   826  }
   827  
   828  // OnNodeAdd is called whenever creation of new node object
   829  // is observed.
   830  func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
   831  	if node.Name != proxier.hostname {
   832  		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
   833  		return
   834  	}
   835  
   836  	if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
   837  		return
   838  	}
   839  
   840  	proxier.mu.Lock()
   841  	proxier.nodeLabels = map[string]string{}
   842  	for k, v := range node.Labels {
   843  		proxier.nodeLabels[k] = v
   844  	}
   845  	proxier.mu.Unlock()
   846  	klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
   847  
   848  	proxier.Sync()
   849  }
   850  
   851  // OnNodeUpdate is called whenever modification of an existing
   852  // node object is observed.
   853  func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
   854  	if node.Name != proxier.hostname {
   855  		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
   856  		return
   857  	}
   858  
   859  	if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
   860  		return
   861  	}
   862  
   863  	proxier.mu.Lock()
   864  	proxier.nodeLabels = map[string]string{}
   865  	for k, v := range node.Labels {
   866  		proxier.nodeLabels[k] = v
   867  	}
   868  	proxier.mu.Unlock()
   869  	klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
   870  
   871  	proxier.Sync()
   872  }
   873  
   874  // OnNodeDelete is called whenever deletion of an existing node
   875  // object is observed.
   876  func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
   877  	if node.Name != proxier.hostname {
   878  		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
   879  		return
   880  	}
   881  
   882  	proxier.mu.Lock()
   883  	proxier.nodeLabels = nil
   884  	proxier.mu.Unlock()
   885  
   886  	proxier.Sync()
   887  }
   888  
   889  // OnNodeSynced is called once all the initial event handlers were
   890  // called and the state is fully propagated to local cache.
   891  func (proxier *Proxier) OnNodeSynced() {
   892  }
   893  
   894  // OnServiceCIDRsChanged is called whenever a change is observed
   895  // in any of the ServiceCIDRs, and provides complete list of service cidrs.
   896  func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
   897  
   898  // This is where all of the ipvs calls happen.
   899  func (proxier *Proxier) syncProxyRules() {
   900  	proxier.mu.Lock()
   901  	defer proxier.mu.Unlock()
   902  
   903  	// don't sync rules till we've received services and endpoints
   904  	if !proxier.isInitialized() {
   905  		klog.V(2).InfoS("Not syncing ipvs rules until Services and Endpoints have been received from master")
   906  		return
   907  	}
   908  
   909  	// its safe to set initialSync to false as it acts as a flag for startup actions
   910  	// and the mutex is held.
   911  	defer func() {
   912  		proxier.initialSync = false
   913  	}()
   914  
   915  	// Keep track of how long syncs take.
   916  	start := time.Now()
   917  	defer func() {
   918  		metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
   919  		klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
   920  	}()
   921  
   922  	// We assume that if this was called, we really want to sync them,
   923  	// even if nothing changed in the meantime. In other words, callers are
   924  	// responsible for detecting no-op changes and not calling this function.
   925  	serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
   926  	endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
   927  
   928  	klog.V(3).InfoS("Syncing ipvs proxier rules")
   929  
   930  	proxier.serviceNoLocalEndpointsInternal = sets.New[string]()
   931  	proxier.serviceNoLocalEndpointsExternal = sets.New[string]()
   932  
   933  	proxier.lbNoNodeAccessIPPortProtocolEntries = make([]*utilipset.Entry, 0)
   934  
   935  	// Begin install iptables
   936  
   937  	// Reset all buffers used later.
   938  	// This is to avoid memory reallocations and thus improve performance.
   939  	proxier.natChains.Reset()
   940  	proxier.natRules.Reset()
   941  	proxier.filterChains.Reset()
   942  	proxier.filterRules.Reset()
   943  
   944  	// Write table headers.
   945  	proxier.filterChains.Write("*filter")
   946  	proxier.natChains.Write("*nat")
   947  
   948  	proxier.createAndLinkKubeChain()
   949  
   950  	// make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
   951  	_, err := proxier.netlinkHandle.EnsureDummyDevice(defaultDummyDevice)
   952  	if err != nil {
   953  		klog.ErrorS(err, "Failed to create dummy interface", "interface", defaultDummyDevice)
   954  		return
   955  	}
   956  
   957  	// make sure ip sets exists in the system.
   958  	for _, set := range proxier.ipsetList {
   959  		if err := ensureIPSet(set); err != nil {
   960  			return
   961  		}
   962  		set.resetEntries()
   963  	}
   964  
   965  	// activeIPVSServices represents IPVS service successfully created in this round of sync
   966  	activeIPVSServices := sets.New[string]()
   967  	// activeBindAddrs Represents addresses we want on the defaultDummyDevice after this round of sync
   968  	activeBindAddrs := sets.New[string]()
   969  	// alreadyBoundAddrs Represents addresses currently assigned to the dummy interface
   970  	alreadyBoundAddrs, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice)
   971  	if err != nil {
   972  		klog.ErrorS(err, "Error listing addresses binded to dummy interface")
   973  	}
   974  	// nodeAddressSet All addresses *except* those on the dummy interface
   975  	nodeAddressSet, err := proxier.netlinkHandle.GetAllLocalAddressesExcept(defaultDummyDevice)
   976  	if err != nil {
   977  		klog.ErrorS(err, "Error listing node addresses")
   978  	}
   979  
   980  	hasNodePort := false
   981  	for _, svc := range proxier.svcPortMap {
   982  		svcInfo, ok := svc.(*servicePortInfo)
   983  		if ok && svcInfo.NodePort() != 0 {
   984  			hasNodePort = true
   985  			break
   986  		}
   987  	}
   988  
   989  	// List of node IP addresses to be used as IPVS services if nodePort is set. This
   990  	// can be reused for all nodePort services.
   991  	var nodeIPs []net.IP
   992  	if hasNodePort {
   993  		if proxier.nodePortAddresses.MatchAll() {
   994  			for _, ipStr := range nodeAddressSet.UnsortedList() {
   995  				nodeIPs = append(nodeIPs, netutils.ParseIPSloppy(ipStr))
   996  			}
   997  		} else {
   998  			allNodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
   999  			if err != nil {
  1000  				klog.ErrorS(err, "Failed to get node IP address matching nodeport cidr")
  1001  			} else {
  1002  				for _, ip := range allNodeIPs {
  1003  					if !ip.IsLoopback() {
  1004  						nodeIPs = append(nodeIPs, ip)
  1005  					}
  1006  				}
  1007  			}
  1008  		}
  1009  	}
  1010  
  1011  	// Build IPVS rules for each service.
  1012  	for svcPortName, svcPort := range proxier.svcPortMap {
  1013  		svcInfo, ok := svcPort.(*servicePortInfo)
  1014  		if !ok {
  1015  			klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
  1016  			continue
  1017  		}
  1018  
  1019  		protocol := strings.ToLower(string(svcInfo.Protocol()))
  1020  		// Precompute svcNameString; with many services the many calls
  1021  		// to ServicePortName.String() show up in CPU profiles.
  1022  		svcPortNameString := svcPortName.String()
  1023  
  1024  		// Handle traffic that loops back to the originator with SNAT.
  1025  		for _, e := range proxier.endpointsMap[svcPortName] {
  1026  			ep, ok := e.(*proxy.BaseEndpointInfo)
  1027  			if !ok {
  1028  				klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e)
  1029  				continue
  1030  			}
  1031  			if !ep.IsLocal() {
  1032  				continue
  1033  			}
  1034  			epIP := ep.IP()
  1035  			epPort := ep.Port()
  1036  			// Error parsing this endpoint has been logged. Skip to next endpoint.
  1037  			if epIP == "" || epPort == 0 {
  1038  				continue
  1039  			}
  1040  			entry := &utilipset.Entry{
  1041  				IP:       epIP,
  1042  				Port:     epPort,
  1043  				Protocol: protocol,
  1044  				IP2:      epIP,
  1045  				SetType:  utilipset.HashIPPortIP,
  1046  			}
  1047  			if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
  1048  				klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
  1049  				continue
  1050  			}
  1051  			proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
  1052  		}
  1053  
  1054  		// Capture the clusterIP.
  1055  		// ipset call
  1056  		entry := &utilipset.Entry{
  1057  			IP:       svcInfo.ClusterIP().String(),
  1058  			Port:     svcInfo.Port(),
  1059  			Protocol: protocol,
  1060  			SetType:  utilipset.HashIPPort,
  1061  		}
  1062  		// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
  1063  		// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
  1064  		if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
  1065  			klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
  1066  			continue
  1067  		}
  1068  		proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
  1069  		// ipvs call
  1070  		serv := &utilipvs.VirtualServer{
  1071  			Address:   svcInfo.ClusterIP(),
  1072  			Port:      uint16(svcInfo.Port()),
  1073  			Protocol:  string(svcInfo.Protocol()),
  1074  			Scheduler: proxier.ipvsScheduler,
  1075  		}
  1076  		// Set session affinity flag and timeout for IPVS service
  1077  		if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1078  			serv.Flags |= utilipvs.FlagPersistent
  1079  			serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1080  		}
  1081  		// Set the source hash flag needed for the distribution method "mh"
  1082  		if proxier.ipvsScheduler == "mh" {
  1083  			serv.Flags |= utilipvs.FlagSourceHash
  1084  		}
  1085  		// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
  1086  		if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil {
  1087  			activeIPVSServices.Insert(serv.String())
  1088  			activeBindAddrs.Insert(serv.Address.String())
  1089  			// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
  1090  			// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
  1091  			internalNodeLocal := false
  1092  			if svcInfo.InternalPolicyLocal() {
  1093  				internalNodeLocal = true
  1094  			}
  1095  			if err := proxier.syncEndpoint(svcPortName, internalNodeLocal, serv); err != nil {
  1096  				klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
  1097  			}
  1098  		} else {
  1099  			klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
  1100  		}
  1101  
  1102  		// Capture externalIPs.
  1103  		for _, externalIP := range svcInfo.ExternalIPs() {
  1104  			// ipset call
  1105  			entry := &utilipset.Entry{
  1106  				IP:       externalIP.String(),
  1107  				Port:     svcInfo.Port(),
  1108  				Protocol: protocol,
  1109  				SetType:  utilipset.HashIPPort,
  1110  			}
  1111  
  1112  			if svcInfo.ExternalPolicyLocal() {
  1113  				if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
  1114  					klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
  1115  					continue
  1116  				}
  1117  				proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
  1118  			} else {
  1119  				// We have to SNAT packets to external IPs.
  1120  				if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
  1121  					klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
  1122  					continue
  1123  				}
  1124  				proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
  1125  			}
  1126  
  1127  			// ipvs call
  1128  			serv := &utilipvs.VirtualServer{
  1129  				Address:   externalIP,
  1130  				Port:      uint16(svcInfo.Port()),
  1131  				Protocol:  string(svcInfo.Protocol()),
  1132  				Scheduler: proxier.ipvsScheduler,
  1133  			}
  1134  			if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1135  				serv.Flags |= utilipvs.FlagPersistent
  1136  				serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1137  			}
  1138  			// Set the source hash flag needed for the distribution method "mh"
  1139  			if proxier.ipvsScheduler == "mh" {
  1140  				serv.Flags |= utilipvs.FlagSourceHash
  1141  			}
  1142  			// We must not add the address to the dummy device if it exist on another interface
  1143  			shouldBind := !nodeAddressSet.Has(serv.Address.String())
  1144  			if err := proxier.syncService(svcPortNameString, serv, shouldBind, alreadyBoundAddrs); err == nil {
  1145  				activeIPVSServices.Insert(serv.String())
  1146  				if shouldBind {
  1147  					activeBindAddrs.Insert(serv.Address.String())
  1148  				}
  1149  				if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
  1150  					klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
  1151  				}
  1152  			} else {
  1153  				klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
  1154  			}
  1155  		}
  1156  
  1157  		// Capture load-balancer ingress.
  1158  		for _, ingress := range svcInfo.LoadBalancerVIPs() {
  1159  			// ipset call
  1160  			entry = &utilipset.Entry{
  1161  				IP:       ingress.String(),
  1162  				Port:     svcInfo.Port(),
  1163  				Protocol: protocol,
  1164  				SetType:  utilipset.HashIPPort,
  1165  			}
  1166  			// add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
  1167  			// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
  1168  			// If we are proxying globally, we need to masquerade in case we cross nodes.
  1169  			// If we are proxying only locally, we can retain the source IP.
  1170  			if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
  1171  				klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
  1172  				continue
  1173  			}
  1174  			proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
  1175  			// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
  1176  			if svcInfo.ExternalPolicyLocal() {
  1177  				if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
  1178  					klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
  1179  					continue
  1180  				}
  1181  				proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
  1182  			}
  1183  			if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
  1184  				// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
  1185  				// This currently works for loadbalancers that preserves source ips.
  1186  				// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
  1187  				if valid := proxier.ipsetList[kubeLoadBalancerFWSet].validateEntry(entry); !valid {
  1188  					klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerFWSet].Name)
  1189  					continue
  1190  				}
  1191  				proxier.ipsetList[kubeLoadBalancerFWSet].activeEntries.Insert(entry.String())
  1192  				allowFromNode := false
  1193  				for _, cidr := range svcInfo.LoadBalancerSourceRanges() {
  1194  					// ipset call
  1195  					entry = &utilipset.Entry{
  1196  						IP:       ingress.String(),
  1197  						Port:     svcInfo.Port(),
  1198  						Protocol: protocol,
  1199  						Net:      cidr.String(),
  1200  						SetType:  utilipset.HashIPPortNet,
  1201  					}
  1202  					// enumerate all white list source cidr
  1203  					if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
  1204  						klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
  1205  						continue
  1206  					}
  1207  					proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
  1208  
  1209  					if cidr.Contains(proxier.nodeIP) {
  1210  						allowFromNode = true
  1211  					}
  1212  				}
  1213  				// generally, ip route rule was added to intercept request to loadbalancer vip from the
  1214  				// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
  1215  				// Need to add the following rule to allow request on host.
  1216  				if allowFromNode {
  1217  					entry = &utilipset.Entry{
  1218  						IP:       ingress.String(),
  1219  						Port:     svcInfo.Port(),
  1220  						Protocol: protocol,
  1221  						IP2:      ingress.String(),
  1222  						SetType:  utilipset.HashIPPortIP,
  1223  					}
  1224  					// enumerate all white list source ip
  1225  					if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
  1226  						klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
  1227  						continue
  1228  					}
  1229  					proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
  1230  				} else {
  1231  					// since nodeIP is not covered in any of SourceRange we need to explicitly block the lbIP access from k8s nodes.
  1232  					proxier.lbNoNodeAccessIPPortProtocolEntries = append(proxier.lbNoNodeAccessIPPortProtocolEntries, entry)
  1233  
  1234  				}
  1235  			}
  1236  			// ipvs call
  1237  			serv := &utilipvs.VirtualServer{
  1238  				Address:   ingress,
  1239  				Port:      uint16(svcInfo.Port()),
  1240  				Protocol:  string(svcInfo.Protocol()),
  1241  				Scheduler: proxier.ipvsScheduler,
  1242  			}
  1243  			if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1244  				serv.Flags |= utilipvs.FlagPersistent
  1245  				serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1246  			}
  1247  			// Set the source hash flag needed for the distribution method "mh"
  1248  			if proxier.ipvsScheduler == "mh" {
  1249  				serv.Flags |= utilipvs.FlagSourceHash
  1250  			}
  1251  			// We must not add the address to the dummy device if it exist on another interface
  1252  			shouldBind := !nodeAddressSet.Has(serv.Address.String())
  1253  			if err := proxier.syncService(svcPortNameString, serv, shouldBind, alreadyBoundAddrs); err == nil {
  1254  				activeIPVSServices.Insert(serv.String())
  1255  				if shouldBind {
  1256  					activeBindAddrs.Insert(serv.Address.String())
  1257  				}
  1258  				if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
  1259  					klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
  1260  				}
  1261  			} else {
  1262  				klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
  1263  			}
  1264  		}
  1265  
  1266  		if svcInfo.NodePort() != 0 {
  1267  			if len(nodeIPs) == 0 {
  1268  				// Skip nodePort configuration since an error occurred when
  1269  				// computing nodeAddresses or nodeIPs.
  1270  				continue
  1271  			}
  1272  
  1273  			// Nodeports need SNAT, unless they're local.
  1274  			// ipset call
  1275  
  1276  			var (
  1277  				nodePortSet *IPSet
  1278  				entries     []*utilipset.Entry
  1279  			)
  1280  
  1281  			switch protocol {
  1282  			case utilipset.ProtocolTCP:
  1283  				nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
  1284  				entries = []*utilipset.Entry{{
  1285  					// No need to provide ip info
  1286  					Port:     svcInfo.NodePort(),
  1287  					Protocol: protocol,
  1288  					SetType:  utilipset.BitmapPort,
  1289  				}}
  1290  			case utilipset.ProtocolUDP:
  1291  				nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
  1292  				entries = []*utilipset.Entry{{
  1293  					// No need to provide ip info
  1294  					Port:     svcInfo.NodePort(),
  1295  					Protocol: protocol,
  1296  					SetType:  utilipset.BitmapPort,
  1297  				}}
  1298  			case utilipset.ProtocolSCTP:
  1299  				nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
  1300  				// Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
  1301  				entries = []*utilipset.Entry{}
  1302  				for _, nodeIP := range nodeIPs {
  1303  					entries = append(entries, &utilipset.Entry{
  1304  						IP:       nodeIP.String(),
  1305  						Port:     svcInfo.NodePort(),
  1306  						Protocol: protocol,
  1307  						SetType:  utilipset.HashIPPort,
  1308  					})
  1309  				}
  1310  			default:
  1311  				// It should never hit
  1312  				klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
  1313  			}
  1314  			if nodePortSet != nil {
  1315  				entryInvalidErr := false
  1316  				for _, entry := range entries {
  1317  					if valid := nodePortSet.validateEntry(entry); !valid {
  1318  						klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortSet.Name)
  1319  						entryInvalidErr = true
  1320  						break
  1321  					}
  1322  					nodePortSet.activeEntries.Insert(entry.String())
  1323  				}
  1324  				if entryInvalidErr {
  1325  					continue
  1326  				}
  1327  			}
  1328  
  1329  			// Add externaltrafficpolicy=local type nodeport entry
  1330  			if svcInfo.ExternalPolicyLocal() {
  1331  				var nodePortLocalSet *IPSet
  1332  				switch protocol {
  1333  				case utilipset.ProtocolTCP:
  1334  					nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP]
  1335  				case utilipset.ProtocolUDP:
  1336  					nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP]
  1337  				case utilipset.ProtocolSCTP:
  1338  					nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
  1339  				default:
  1340  					// It should never hit
  1341  					klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
  1342  				}
  1343  				if nodePortLocalSet != nil {
  1344  					entryInvalidErr := false
  1345  					for _, entry := range entries {
  1346  						if valid := nodePortLocalSet.validateEntry(entry); !valid {
  1347  							klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortLocalSet.Name)
  1348  							entryInvalidErr = true
  1349  							break
  1350  						}
  1351  						nodePortLocalSet.activeEntries.Insert(entry.String())
  1352  					}
  1353  					if entryInvalidErr {
  1354  						continue
  1355  					}
  1356  				}
  1357  			}
  1358  
  1359  			// Build ipvs kernel routes for each node ip address
  1360  			for _, nodeIP := range nodeIPs {
  1361  				// ipvs call
  1362  				serv := &utilipvs.VirtualServer{
  1363  					Address:   nodeIP,
  1364  					Port:      uint16(svcInfo.NodePort()),
  1365  					Protocol:  string(svcInfo.Protocol()),
  1366  					Scheduler: proxier.ipvsScheduler,
  1367  				}
  1368  				if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1369  					serv.Flags |= utilipvs.FlagPersistent
  1370  					serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1371  				}
  1372  				// Set the source hash flag needed for the distribution method "mh"
  1373  				if proxier.ipvsScheduler == "mh" {
  1374  					serv.Flags |= utilipvs.FlagSourceHash
  1375  				}
  1376  				// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
  1377  				if err := proxier.syncService(svcPortNameString, serv, false, alreadyBoundAddrs); err == nil {
  1378  					activeIPVSServices.Insert(serv.String())
  1379  					if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
  1380  						klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
  1381  					}
  1382  				} else {
  1383  					klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
  1384  				}
  1385  			}
  1386  		}
  1387  
  1388  		if svcInfo.HealthCheckNodePort() != 0 {
  1389  			nodePortSet := proxier.ipsetList[kubeHealthCheckNodePortSet]
  1390  			entry := &utilipset.Entry{
  1391  				// No need to provide ip info
  1392  				Port:     svcInfo.HealthCheckNodePort(),
  1393  				Protocol: "tcp",
  1394  				SetType:  utilipset.BitmapPort,
  1395  			}
  1396  
  1397  			if valid := nodePortSet.validateEntry(entry); !valid {
  1398  				klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortSet.Name)
  1399  				continue
  1400  			}
  1401  			nodePortSet.activeEntries.Insert(entry.String())
  1402  		}
  1403  	}
  1404  
  1405  	// Set the KUBE-IPVS-IPS set to the "activeBindAddrs"
  1406  	proxier.ipsetList[kubeIPVSSet].activeEntries = activeBindAddrs
  1407  
  1408  	// sync ipset entries
  1409  	for _, set := range proxier.ipsetList {
  1410  		set.syncIPSetEntries()
  1411  	}
  1412  
  1413  	// Tail call iptables rules for ipset, make sure only call iptables once
  1414  	// in a single loop per ip set.
  1415  	proxier.writeIptablesRules()
  1416  
  1417  	// Sync iptables rules.
  1418  	// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
  1419  	proxier.iptablesData.Reset()
  1420  	proxier.iptablesData.Write(proxier.natChains.Bytes())
  1421  	proxier.iptablesData.Write(proxier.natRules.Bytes())
  1422  	proxier.iptablesData.Write(proxier.filterChains.Bytes())
  1423  	proxier.iptablesData.Write(proxier.filterRules.Bytes())
  1424  
  1425  	klog.V(5).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
  1426  	err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
  1427  	if err != nil {
  1428  		if pErr, ok := err.(utiliptables.ParseError); ok {
  1429  			lines := utiliptables.ExtractLines(proxier.iptablesData.Bytes(), pErr.Line(), 3)
  1430  			klog.ErrorS(pErr, "Failed to execute iptables-restore", "rules", lines)
  1431  		} else {
  1432  			klog.ErrorS(err, "Failed to execute iptables-restore", "rules", proxier.iptablesData.Bytes())
  1433  		}
  1434  		metrics.IptablesRestoreFailuresTotal.Inc()
  1435  		return
  1436  	}
  1437  	for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
  1438  		for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
  1439  			latency := metrics.SinceInSeconds(lastChangeTriggerTime)
  1440  			metrics.NetworkProgrammingLatency.Observe(latency)
  1441  			klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
  1442  		}
  1443  	}
  1444  
  1445  	// Remove superfluous addresses from the dummy device
  1446  	superfluousAddresses := alreadyBoundAddrs.Difference(activeBindAddrs)
  1447  	if superfluousAddresses.Len() > 0 {
  1448  		klog.V(2).InfoS("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses)
  1449  		for adr := range superfluousAddresses {
  1450  			if err := proxier.netlinkHandle.UnbindAddress(adr, defaultDummyDevice); err != nil {
  1451  				klog.ErrorS(err, "UnbindAddress", "interface", defaultDummyDevice, "address", adr)
  1452  			}
  1453  		}
  1454  	}
  1455  
  1456  	// currentIPVSServices represent IPVS services listed from the system
  1457  	// (including any we have created in this sync)
  1458  	currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
  1459  	appliedSvcs, err := proxier.ipvs.GetVirtualServers()
  1460  	if err == nil {
  1461  		for _, appliedSvc := range appliedSvcs {
  1462  			currentIPVSServices[appliedSvc.String()] = appliedSvc
  1463  		}
  1464  	} else {
  1465  		klog.ErrorS(err, "Failed to get ipvs service")
  1466  	}
  1467  	proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
  1468  
  1469  	if proxier.healthzServer != nil {
  1470  		proxier.healthzServer.Updated(proxier.ipFamily)
  1471  	}
  1472  	metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
  1473  
  1474  	// Update service healthchecks.  The endpoints list might include services that are
  1475  	// not "OnlyLocal", but the services list will not, and the serviceHealthServer
  1476  	// will just drop those endpoints.
  1477  	if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
  1478  		klog.ErrorS(err, "Error syncing healthcheck services")
  1479  	}
  1480  	if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
  1481  		klog.ErrorS(err, "Error syncing healthcheck endpoints")
  1482  	}
  1483  
  1484  	metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len()))
  1485  	metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
  1486  
  1487  	// Finish housekeeping, clear stale conntrack entries for UDP Services
  1488  	conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
  1489  }
  1490  
  1491  // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
  1492  // according to proxier.ipsetList information and the ipset match relationship that `ipsetWithIptablesChain` specified.
  1493  // some ipset(kubeClusterIPSet for example) have particular match rules and iptables jump relation should be sync separately.
  1494  func (proxier *Proxier) writeIptablesRules() {
  1495  
  1496  	// Dismiss connects to localhost early in the service chain
  1497  	loAddr := "127.0.0.0/8"
  1498  	if proxier.ipFamily == v1.IPv6Protocol {
  1499  		loAddr = "::1/128"
  1500  	}
  1501  	proxier.natRules.Write("-A", string(kubeServicesChain), "-s", loAddr, "-j", "RETURN")
  1502  
  1503  	// We are creating those slices ones here to avoid memory reallocations
  1504  	// in every loop. Note that reuse the memory, instead of doing:
  1505  	//   slice = <some new slice>
  1506  	// you should always do one of the below:
  1507  	//   slice = slice[:0] // and then append to it
  1508  	//   slice = append(slice[:0], ...)
  1509  	// To avoid growing this slice, we arbitrarily set its size to 64,
  1510  	// there is never more than that many arguments for a single line.
  1511  	// Note that even if we go over 64, it will still be correct - it
  1512  	// is just for efficiency, not correctness.
  1513  	args := make([]string, 64)
  1514  
  1515  	for _, set := range ipsetWithIptablesChain {
  1516  		if _, find := proxier.ipsetList[set.name]; find && !proxier.ipsetList[set.name].isEmpty() {
  1517  			args = append(args[:0], "-A", set.from)
  1518  			if set.protocolMatch != "" {
  1519  				args = append(args, "-p", set.protocolMatch)
  1520  			}
  1521  			args = append(args,
  1522  				"-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(),
  1523  				"-m", "set", "--match-set", proxier.ipsetList[set.name].Name,
  1524  				set.matchType,
  1525  			)
  1526  			if set.table == utiliptables.TableFilter {
  1527  				proxier.filterRules.Write(args, "-j", set.to)
  1528  			} else {
  1529  				proxier.natRules.Write(args, "-j", set.to)
  1530  			}
  1531  		}
  1532  	}
  1533  
  1534  	if !proxier.ipsetList[kubeClusterIPSet].isEmpty() {
  1535  		args = append(args[:0],
  1536  			"-A", string(kubeServicesChain),
  1537  			"-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(),
  1538  			"-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name,
  1539  		)
  1540  		if proxier.masqueradeAll {
  1541  			proxier.natRules.Write(
  1542  				args, "dst,dst",
  1543  				"-j", string(kubeMarkMasqChain))
  1544  		} else if proxier.localDetector.IsImplemented() {
  1545  			// This masquerades off-cluster traffic to a service VIP.  The idea
  1546  			// is that you can establish a static route for your Service range,
  1547  			// routing to any node, and that node will bridge into the Service
  1548  			// for you.  Since that might bounce off-node, we masquerade here.
  1549  			// If/when we support "Local" policy for VIPs, we should update this.
  1550  			proxier.natRules.Write(
  1551  				args, "dst,dst",
  1552  				proxier.localDetector.IfNotLocal(),
  1553  				"-j", string(kubeMarkMasqChain))
  1554  		} else {
  1555  			// Masquerade all OUTPUT traffic coming from a service ip.
  1556  			// The kube dummy interface has all service VIPs assigned which
  1557  			// results in the service VIP being picked as the source IP to reach
  1558  			// a VIP. This leads to a connection from VIP:<random port> to
  1559  			// VIP:<service port>.
  1560  			// Always masquerading OUTPUT (node-originating) traffic with a VIP
  1561  			// source ip and service port destination fixes the outgoing connections.
  1562  			proxier.natRules.Write(
  1563  				args, "src,dst",
  1564  				"-j", string(kubeMarkMasqChain))
  1565  		}
  1566  	}
  1567  
  1568  	// externalIPRules adds iptables rules applies to Service ExternalIPs
  1569  	externalIPRules := func(args []string) {
  1570  		// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
  1571  		// nor from a local process to be forwarded to the service.
  1572  		// This rule roughly translates to "all traffic from off-machine".
  1573  		// This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
  1574  		externalTrafficOnlyArgs := append(args,
  1575  			"-m", "physdev", "!", "--physdev-is-in",
  1576  			"-m", "addrtype", "!", "--src-type", "LOCAL")
  1577  		proxier.natRules.Write(externalTrafficOnlyArgs, "-j", "ACCEPT")
  1578  		dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
  1579  		// Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
  1580  		// This covers cases like GCE load-balancers which get added to the local routing table.
  1581  		proxier.natRules.Write(dstLocalOnlyArgs, "-j", "ACCEPT")
  1582  	}
  1583  
  1584  	if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
  1585  		// Build masquerade rules for packets to external IPs.
  1586  		args = append(args[:0],
  1587  			"-A", string(kubeServicesChain),
  1588  			"-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
  1589  			"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name,
  1590  			"dst,dst",
  1591  		)
  1592  		proxier.natRules.Write(args, "-j", string(kubeMarkMasqChain))
  1593  		externalIPRules(args)
  1594  	}
  1595  
  1596  	if !proxier.ipsetList[kubeExternalIPLocalSet].isEmpty() {
  1597  		args = append(args[:0],
  1598  			"-A", string(kubeServicesChain),
  1599  			"-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPLocalSet].getComment(),
  1600  			"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPLocalSet].Name,
  1601  			"dst,dst",
  1602  		)
  1603  		externalIPRules(args)
  1604  	}
  1605  
  1606  	// -A KUBE-SERVICES  -m addrtype  --dst-type LOCAL -j KUBE-NODE-PORT
  1607  	args = append(args[:0],
  1608  		"-A", string(kubeServicesChain),
  1609  		"-m", "addrtype", "--dst-type", "LOCAL",
  1610  	)
  1611  	proxier.natRules.Write(args, "-j", string(kubeNodePortChain))
  1612  
  1613  	// mark for masquerading for KUBE-LOAD-BALANCER
  1614  	proxier.natRules.Write(
  1615  		"-A", string(kubeLoadBalancerChain),
  1616  		"-j", string(kubeMarkMasqChain),
  1617  	)
  1618  
  1619  	// drop packets filtered by KUBE-SOURCE-RANGES-FIREWALL
  1620  	proxier.filterRules.Write(
  1621  		"-A", string(kubeSourceRangesFirewallChain),
  1622  		"-j", "DROP",
  1623  	)
  1624  
  1625  	// disable LB access from node
  1626  	// for IPVS src and dst both would be lbIP
  1627  	for _, entry := range proxier.lbNoNodeAccessIPPortProtocolEntries {
  1628  		proxier.filterRules.Write(
  1629  			"-A", string(kubeIPVSOutFilterChain),
  1630  			"-s", entry.IP,
  1631  			"-m", "ipvs", "--vaddr", entry.IP, "--vproto", entry.Protocol, "--vport", strconv.Itoa(entry.Port),
  1632  			"-j", "DROP",
  1633  		)
  1634  	}
  1635  
  1636  	// Accept all traffic with destination of ipvs virtual service, in case other iptables rules
  1637  	// block the traffic, that may result in ipvs rules invalid.
  1638  	// Those rules must be in the end of KUBE-SERVICE chain
  1639  	proxier.acceptIPVSTraffic()
  1640  
  1641  	// If the masqueradeMark has been added then we want to forward that same
  1642  	// traffic, this allows NodePort traffic to be forwarded even if the default
  1643  	// FORWARD policy is not accept.
  1644  	proxier.filterRules.Write(
  1645  		"-A", string(kubeForwardChain),
  1646  		"-m", "comment", "--comment", `"kubernetes forwarding rules"`,
  1647  		"-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
  1648  		"-j", "ACCEPT",
  1649  	)
  1650  
  1651  	// The following rule ensures the traffic after the initial packet accepted
  1652  	// by the "kubernetes forwarding rules" rule above will be accepted.
  1653  	proxier.filterRules.Write(
  1654  		"-A", string(kubeForwardChain),
  1655  		"-m", "comment", "--comment", `"kubernetes forwarding conntrack rule"`,
  1656  		"-m", "conntrack",
  1657  		"--ctstate", "RELATED,ESTABLISHED",
  1658  		"-j", "ACCEPT",
  1659  	)
  1660  
  1661  	// Add rule to accept traffic towards health check node port
  1662  	proxier.filterRules.Write(
  1663  		"-A", string(kubeNodePortChain),
  1664  		"-m", "comment", "--comment", proxier.ipsetList[kubeHealthCheckNodePortSet].getComment(),
  1665  		"-m", "set", "--match-set", proxier.ipsetList[kubeHealthCheckNodePortSet].Name, "dst",
  1666  		"-j", "ACCEPT",
  1667  	)
  1668  
  1669  	// Add rules to the filter/KUBE-IPVS-FILTER chain to prevent access to ports on the host through VIP addresses.
  1670  	// https://github.com/kubernetes/kubernetes/issues/72236
  1671  	proxier.filterRules.Write(
  1672  		"-A", string(kubeIPVSFilterChain),
  1673  		"-m", "set", "--match-set", proxier.ipsetList[kubeLoadBalancerSet].Name, "dst,dst", "-j", "RETURN")
  1674  	proxier.filterRules.Write(
  1675  		"-A", string(kubeIPVSFilterChain),
  1676  		"-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name, "dst,dst", "-j", "RETURN")
  1677  	proxier.filterRules.Write(
  1678  		"-A", string(kubeIPVSFilterChain),
  1679  		"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name, "dst,dst", "-j", "RETURN")
  1680  	proxier.filterRules.Write(
  1681  		"-A", string(kubeIPVSFilterChain),
  1682  		"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPLocalSet].Name, "dst,dst", "-j", "RETURN")
  1683  	proxier.filterRules.Write(
  1684  		"-A", string(kubeIPVSFilterChain),
  1685  		"-m", "set", "--match-set", proxier.ipsetList[kubeHealthCheckNodePortSet].Name, "dst", "-j", "RETURN")
  1686  	proxier.filterRules.Write(
  1687  		"-A", string(kubeIPVSFilterChain),
  1688  		"-m", "conntrack", "--ctstate", "NEW",
  1689  		"-m", "set", "--match-set", proxier.ipsetList[kubeIPVSSet].Name, "dst", "-j", "REJECT")
  1690  
  1691  	// Install the kubernetes-specific postrouting rules. We use a whole chain for
  1692  	// this so that it is easier to flush and change, for example if the mark
  1693  	// value should ever change.
  1694  
  1695  	proxier.natRules.Write(
  1696  		"-A", string(kubePostroutingChain),
  1697  		"-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
  1698  		"-j", "RETURN",
  1699  	)
  1700  	// Clear the mark to avoid re-masquerading if the packet re-traverses the network stack.
  1701  	proxier.natRules.Write(
  1702  		"-A", string(kubePostroutingChain),
  1703  		// XOR proxier.masqueradeMark to unset it
  1704  		"-j", "MARK", "--xor-mark", proxier.masqueradeMark,
  1705  	)
  1706  	masqRule := []string{
  1707  		"-A", string(kubePostroutingChain),
  1708  		"-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
  1709  		"-j", "MASQUERADE",
  1710  	}
  1711  	if proxier.iptables.HasRandomFully() {
  1712  		masqRule = append(masqRule, "--random-fully")
  1713  	}
  1714  	proxier.natRules.Write(masqRule)
  1715  
  1716  	// Install the kubernetes-specific masquerade mark rule. We use a whole chain for
  1717  	// this so that it is easier to flush and change, for example if the mark
  1718  	// value should ever change.
  1719  	proxier.natRules.Write(
  1720  		"-A", string(kubeMarkMasqChain),
  1721  		"-j", "MARK", "--or-mark", proxier.masqueradeMark,
  1722  	)
  1723  
  1724  	// Write the end-of-table markers.
  1725  	proxier.filterRules.Write("COMMIT")
  1726  	proxier.natRules.Write("COMMIT")
  1727  }
  1728  
  1729  func (proxier *Proxier) acceptIPVSTraffic() {
  1730  	sets := []string{kubeClusterIPSet, kubeLoadBalancerSet}
  1731  	for _, set := range sets {
  1732  		var matchType string
  1733  		if !proxier.ipsetList[set].isEmpty() {
  1734  			switch proxier.ipsetList[set].SetType {
  1735  			case utilipset.BitmapPort:
  1736  				matchType = "dst"
  1737  			default:
  1738  				matchType = "dst,dst"
  1739  			}
  1740  			proxier.natRules.Write(
  1741  				"-A", string(kubeServicesChain),
  1742  				"-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType,
  1743  				"-j", "ACCEPT",
  1744  			)
  1745  		}
  1746  	}
  1747  }
  1748  
  1749  // createAndLinkKubeChain create all kube chains that ipvs proxier need and write basic link.
  1750  func (proxier *Proxier) createAndLinkKubeChain() {
  1751  	for _, ch := range iptablesChains {
  1752  		if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
  1753  			klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
  1754  			return
  1755  		}
  1756  		if ch.table == utiliptables.TableNAT {
  1757  			proxier.natChains.Write(utiliptables.MakeChainLine(ch.chain))
  1758  		} else {
  1759  			proxier.filterChains.Write(utiliptables.MakeChainLine(ch.chain))
  1760  		}
  1761  	}
  1762  
  1763  	for _, jc := range iptablesJumpChain {
  1764  		args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)}
  1765  		if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil {
  1766  			klog.ErrorS(err, "Failed to ensure chain jumps", "table", jc.table, "srcChain", jc.from, "dstChain", jc.to)
  1767  		}
  1768  	}
  1769  
  1770  }
  1771  
  1772  func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, alreadyBoundAddrs sets.Set[string]) error {
  1773  	appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
  1774  	if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
  1775  		if appliedVirtualServer == nil {
  1776  			// IPVS service is not found, create a new service
  1777  			klog.V(3).InfoS("Adding new service", "serviceName", svcName, "virtualServer", vs)
  1778  			if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
  1779  				klog.ErrorS(err, "Failed to add IPVS service", "serviceName", svcName)
  1780  				return err
  1781  			}
  1782  		} else {
  1783  			// IPVS service was changed, update the existing one
  1784  			// During updates, service VIP will not go down
  1785  			klog.V(3).InfoS("IPVS service was changed", "serviceName", svcName)
  1786  			if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
  1787  				klog.ErrorS(err, "Failed to update IPVS service")
  1788  				return err
  1789  			}
  1790  		}
  1791  	}
  1792  
  1793  	// bind service address to dummy interface
  1794  	if bindAddr {
  1795  		// always attempt to bind if alreadyBoundAddrs is nil,
  1796  		// otherwise check if it's already binded and return early
  1797  		if alreadyBoundAddrs != nil && alreadyBoundAddrs.Has(vs.Address.String()) {
  1798  			return nil
  1799  		}
  1800  
  1801  		klog.V(4).InfoS("Bind address", "address", vs.Address)
  1802  		_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), defaultDummyDevice)
  1803  		if err != nil {
  1804  			klog.ErrorS(err, "Failed to bind service address to dummy device", "serviceName", svcName)
  1805  			return err
  1806  		}
  1807  	}
  1808  
  1809  	return nil
  1810  }
  1811  
  1812  func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
  1813  	appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
  1814  	if err != nil {
  1815  		klog.ErrorS(err, "Failed to get IPVS service")
  1816  		return err
  1817  	}
  1818  	if appliedVirtualServer == nil {
  1819  		return errors.New("IPVS virtual service does not exist")
  1820  	}
  1821  
  1822  	// curEndpoints represents IPVS destinations listed from current system.
  1823  	curEndpoints := sets.New[string]()
  1824  	curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
  1825  	if err != nil {
  1826  		klog.ErrorS(err, "Failed to list IPVS destinations")
  1827  		return err
  1828  	}
  1829  	for _, des := range curDests {
  1830  		curEndpoints.Insert(des.String())
  1831  	}
  1832  
  1833  	endpoints := proxier.endpointsMap[svcPortName]
  1834  
  1835  	// Filtering for topology aware endpoints. This function will only
  1836  	// filter endpoints if appropriate feature gates are enabled and the
  1837  	// Service does not have conflicting configuration such as
  1838  	// externalTrafficPolicy=Local.
  1839  	svcInfo, ok := proxier.svcPortMap[svcPortName]
  1840  	if !ok {
  1841  		klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName)
  1842  	} else {
  1843  		clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels)
  1844  		if onlyNodeLocalEndpoints {
  1845  			if len(localEndpoints) > 0 {
  1846  				endpoints = localEndpoints
  1847  			} else {
  1848  				// https://github.com/kubernetes/kubernetes/pull/97081
  1849  				// Allow access from local PODs even if no local endpoints exist.
  1850  				// Traffic from an external source will be routed but the reply
  1851  				// will have the POD address and will be discarded.
  1852  				endpoints = clusterEndpoints
  1853  
  1854  				if hasAnyEndpoints && svcInfo.InternalPolicyLocal() {
  1855  					proxier.serviceNoLocalEndpointsInternal.Insert(svcPortName.NamespacedName.String())
  1856  				}
  1857  
  1858  				if hasAnyEndpoints && svcInfo.ExternalPolicyLocal() {
  1859  					proxier.serviceNoLocalEndpointsExternal.Insert(svcPortName.NamespacedName.String())
  1860  				}
  1861  			}
  1862  		} else {
  1863  			endpoints = clusterEndpoints
  1864  		}
  1865  	}
  1866  
  1867  	newEndpoints := sets.New[string]()
  1868  	for _, epInfo := range endpoints {
  1869  		newEndpoints.Insert(epInfo.String())
  1870  	}
  1871  
  1872  	// Create new endpoints
  1873  	for _, ep := range newEndpoints.UnsortedList() {
  1874  		ip, port, err := net.SplitHostPort(ep)
  1875  		if err != nil {
  1876  			klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep)
  1877  			continue
  1878  		}
  1879  		portNum, err := strconv.Atoi(port)
  1880  		if err != nil {
  1881  			klog.ErrorS(err, "Failed to parse endpoint port", "port", port)
  1882  			continue
  1883  		}
  1884  
  1885  		newDest := &utilipvs.RealServer{
  1886  			Address: netutils.ParseIPSloppy(ip),
  1887  			Port:    uint16(portNum),
  1888  			Weight:  1,
  1889  		}
  1890  
  1891  		if curEndpoints.Has(ep) {
  1892  			// if we are syncing for the first time, loop through all current destinations and
  1893  			// reset their weight.
  1894  			if proxier.initialSync {
  1895  				for _, dest := range curDests {
  1896  					if dest.Weight != newDest.Weight {
  1897  						err = proxier.ipvs.UpdateRealServer(appliedVirtualServer, newDest)
  1898  						if err != nil {
  1899  							klog.ErrorS(err, "Failed to update destination", "newDest", newDest)
  1900  							continue
  1901  						}
  1902  					}
  1903  				}
  1904  			}
  1905  			// check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately
  1906  			uniqueRS := GetUniqueRSName(vs, newDest)
  1907  			if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
  1908  				continue
  1909  			}
  1910  			klog.V(5).InfoS("new ep is in graceful delete list", "uniqueRealServer", uniqueRS)
  1911  			err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
  1912  			if err != nil {
  1913  				klog.ErrorS(err, "Failed to delete endpoint in gracefulDeleteQueue", "endpoint", ep)
  1914  				continue
  1915  			}
  1916  		}
  1917  		err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
  1918  		if err != nil {
  1919  			klog.ErrorS(err, "Failed to add destination", "newDest", newDest)
  1920  			continue
  1921  		}
  1922  	}
  1923  
  1924  	// Delete old endpoints
  1925  	for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
  1926  		// if curEndpoint is in gracefulDelete, skip
  1927  		uniqueRS := vs.String() + "/" + ep
  1928  		if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
  1929  			continue
  1930  		}
  1931  		ip, port, err := net.SplitHostPort(ep)
  1932  		if err != nil {
  1933  			klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep)
  1934  			continue
  1935  		}
  1936  		portNum, err := strconv.Atoi(port)
  1937  		if err != nil {
  1938  			klog.ErrorS(err, "Failed to parse endpoint port", "port", port)
  1939  			continue
  1940  		}
  1941  
  1942  		delDest := &utilipvs.RealServer{
  1943  			Address: netutils.ParseIPSloppy(ip),
  1944  			Port:    uint16(portNum),
  1945  		}
  1946  
  1947  		klog.V(5).InfoS("Using graceful delete", "uniqueRealServer", uniqueRS)
  1948  		err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
  1949  		if err != nil {
  1950  			klog.ErrorS(err, "Failed to delete destination", "uniqueRealServer", uniqueRS)
  1951  			continue
  1952  		}
  1953  	}
  1954  	return nil
  1955  }
  1956  
  1957  func (proxier *Proxier) cleanLegacyService(activeServices sets.Set[string], currentServices map[string]*utilipvs.VirtualServer) {
  1958  	for cs, svc := range currentServices {
  1959  		if proxier.isIPInExcludeCIDRs(svc.Address) {
  1960  			continue
  1961  		}
  1962  		if getIPFamily(svc.Address) != proxier.ipFamily {
  1963  			// Not our family
  1964  			continue
  1965  		}
  1966  		if !activeServices.Has(cs) {
  1967  			klog.V(4).InfoS("Delete service", "virtualServer", svc)
  1968  			if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
  1969  				klog.ErrorS(err, "Failed to delete service", "virtualServer", svc)
  1970  			}
  1971  		}
  1972  	}
  1973  }
  1974  
  1975  func (proxier *Proxier) isIPInExcludeCIDRs(ip net.IP) bool {
  1976  	// make sure it does not fall within an excluded CIDR range.
  1977  	for _, excludedCIDR := range proxier.excludeCIDRs {
  1978  		if excludedCIDR.Contains(ip) {
  1979  			return true
  1980  		}
  1981  	}
  1982  	return false
  1983  }
  1984  
  1985  func getIPFamily(ip net.IP) v1.IPFamily {
  1986  	if netutils.IsIPv4(ip) {
  1987  		return v1.IPv4Protocol
  1988  	}
  1989  	return v1.IPv6Protocol
  1990  }
  1991  
  1992  // ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets
  1993  // It will only operate iptables *nat table.
  1994  // Create and link the kube postrouting chain for SNAT packets.
  1995  // Chain POSTROUTING (policy ACCEPT)
  1996  // target     prot opt source               destination
  1997  // KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules *
  1998  // Maintain by kubelet network sync loop
  1999  
  2000  // *nat
  2001  // :KUBE-POSTROUTING - [0:0]
  2002  // Chain KUBE-POSTROUTING (1 references)
  2003  // target     prot opt source               destination
  2004  // MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
  2005  
  2006  // :KUBE-MARK-MASQ - [0:0]
  2007  // Chain KUBE-MARK-MASQ (0 references)
  2008  // target     prot opt source               destination
  2009  // MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000
  2010  

View as plain text