...

Source file src/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go

Documentation: k8s.io/kubernetes/pkg/proxy/iptables

     1  //go:build linux
     2  // +build linux
     3  
     4  /*
     5  Copyright 2015 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package iptables
    21  
    22  //
    23  // NOTE: this needs to be tested in e2e since it uses iptables for everything.
    24  //
    25  
    26  import (
    27  	"bytes"
    28  	"crypto/sha256"
    29  	"encoding/base32"
    30  	"fmt"
    31  	"net"
    32  	"reflect"
    33  	"strconv"
    34  	"strings"
    35  	"sync"
    36  	"sync/atomic"
    37  	"time"
    38  
    39  	v1 "k8s.io/api/core/v1"
    40  	discovery "k8s.io/api/discovery/v1"
    41  	"k8s.io/apimachinery/pkg/types"
    42  	"k8s.io/apimachinery/pkg/util/sets"
    43  	"k8s.io/apimachinery/pkg/util/wait"
    44  	"k8s.io/client-go/tools/events"
    45  	utilsysctl "k8s.io/component-helpers/node/util/sysctl"
    46  	"k8s.io/klog/v2"
    47  	"k8s.io/kubernetes/pkg/proxy"
    48  	"k8s.io/kubernetes/pkg/proxy/conntrack"
    49  	"k8s.io/kubernetes/pkg/proxy/healthcheck"
    50  	"k8s.io/kubernetes/pkg/proxy/metaproxier"
    51  	"k8s.io/kubernetes/pkg/proxy/metrics"
    52  	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
    53  	proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
    54  	"k8s.io/kubernetes/pkg/util/async"
    55  	utiliptables "k8s.io/kubernetes/pkg/util/iptables"
    56  	utilexec "k8s.io/utils/exec"
    57  )
    58  
    59  const (
    60  	// the services chain
    61  	kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
    62  
    63  	// the external services chain
    64  	kubeExternalServicesChain utiliptables.Chain = "KUBE-EXTERNAL-SERVICES"
    65  
    66  	// the nodeports chain
    67  	kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
    68  
    69  	// the kubernetes postrouting chain
    70  	kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
    71  
    72  	// kubeMarkMasqChain is the mark-for-masquerade chain
    73  	kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
    74  
    75  	// the kubernetes forward chain
    76  	kubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
    77  
    78  	// kubeProxyFirewallChain is the kube-proxy firewall chain
    79  	kubeProxyFirewallChain utiliptables.Chain = "KUBE-PROXY-FIREWALL"
    80  
    81  	// kube proxy canary chain is used for monitoring rule reload
    82  	kubeProxyCanaryChain utiliptables.Chain = "KUBE-PROXY-CANARY"
    83  
    84  	// kubeletFirewallChain is a duplicate of kubelet's firewall containing
    85  	// the anti-martian-packet rule. It should not be used for any other
    86  	// rules.
    87  	kubeletFirewallChain utiliptables.Chain = "KUBE-FIREWALL"
    88  
    89  	// largeClusterEndpointsThreshold is the number of endpoints at which
    90  	// we switch into "large cluster mode" and optimize for iptables
    91  	// performance over iptables debuggability
    92  	largeClusterEndpointsThreshold = 1000
    93  )
    94  
    95  const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
    96  const sysctlNFConntrackTCPBeLiberal = "net/netfilter/nf_conntrack_tcp_be_liberal"
    97  
    98  // NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies.
    99  func NewDualStackProxier(
   100  	ipt [2]utiliptables.Interface,
   101  	sysctl utilsysctl.Interface,
   102  	exec utilexec.Interface,
   103  	syncPeriod time.Duration,
   104  	minSyncPeriod time.Duration,
   105  	masqueradeAll bool,
   106  	localhostNodePorts bool,
   107  	masqueradeBit int,
   108  	localDetectors [2]proxyutiliptables.LocalTrafficDetector,
   109  	hostname string,
   110  	nodeIPs map[v1.IPFamily]net.IP,
   111  	recorder events.EventRecorder,
   112  	healthzServer *healthcheck.ProxierHealthServer,
   113  	nodePortAddresses []string,
   114  	initOnly bool,
   115  ) (proxy.Provider, error) {
   116  	// Create an ipv4 instance of the single-stack proxier
   117  	ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], sysctl,
   118  		exec, syncPeriod, minSyncPeriod, masqueradeAll, localhostNodePorts, masqueradeBit, localDetectors[0], hostname,
   119  		nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
   120  	if err != nil {
   121  		return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
   122  	}
   123  
   124  	ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], sysctl,
   125  		exec, syncPeriod, minSyncPeriod, masqueradeAll, false, masqueradeBit, localDetectors[1], hostname,
   126  		nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
   127  	if err != nil {
   128  		return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
   129  	}
   130  	if initOnly {
   131  		return nil, nil
   132  	}
   133  	return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
   134  }
   135  
   136  // Proxier is an iptables based proxy for connections between a localhost:lport
   137  // and services that provide the actual backends.
   138  type Proxier struct {
   139  	// ipFamily defines the IP family which this proxier is tracking.
   140  	ipFamily v1.IPFamily
   141  
   142  	// endpointsChanges and serviceChanges contains all changes to endpoints and
   143  	// services that happened since iptables was synced. For a single object,
   144  	// changes are accumulated, i.e. previous is state from before all of them,
   145  	// current is state after applying all of those.
   146  	endpointsChanges *proxy.EndpointsChangeTracker
   147  	serviceChanges   *proxy.ServiceChangeTracker
   148  
   149  	mu           sync.Mutex // protects the following fields
   150  	svcPortMap   proxy.ServicePortMap
   151  	endpointsMap proxy.EndpointsMap
   152  	nodeLabels   map[string]string
   153  	// endpointSlicesSynced, and servicesSynced are set to true
   154  	// when corresponding objects are synced after startup. This is used to avoid
   155  	// updating iptables with some partial data after kube-proxy restart.
   156  	endpointSlicesSynced bool
   157  	servicesSynced       bool
   158  	needFullSync         bool
   159  	initialized          int32
   160  	syncRunner           *async.BoundedFrequencyRunner // governs calls to syncProxyRules
   161  	syncPeriod           time.Duration
   162  	lastIPTablesCleanup  time.Time
   163  
   164  	// These are effectively const and do not need the mutex to be held.
   165  	iptables       utiliptables.Interface
   166  	masqueradeAll  bool
   167  	masqueradeMark string
   168  	conntrack      conntrack.Interface
   169  	localDetector  proxyutiliptables.LocalTrafficDetector
   170  	hostname       string
   171  	nodeIP         net.IP
   172  	recorder       events.EventRecorder
   173  
   174  	serviceHealthServer healthcheck.ServiceHealthServer
   175  	healthzServer       *healthcheck.ProxierHealthServer
   176  
   177  	// Since converting probabilities (floats) to strings is expensive
   178  	// and we are using only probabilities in the format of 1/n, we are
   179  	// precomputing some number of those and cache for future reuse.
   180  	precomputedProbabilities []string
   181  
   182  	// The following buffers are used to reuse memory and avoid allocations
   183  	// that are significantly impacting performance.
   184  	iptablesData             *bytes.Buffer
   185  	existingFilterChainsData *bytes.Buffer
   186  	filterChains             proxyutil.LineBuffer
   187  	filterRules              proxyutil.LineBuffer
   188  	natChains                proxyutil.LineBuffer
   189  	natRules                 proxyutil.LineBuffer
   190  
   191  	// largeClusterMode is set at the beginning of syncProxyRules if we are
   192  	// going to end up outputting "lots" of iptables rules and so we need to
   193  	// optimize for performance over debuggability.
   194  	largeClusterMode bool
   195  
   196  	// localhostNodePorts indicates whether we allow NodePort services to be accessed
   197  	// via localhost.
   198  	localhostNodePorts bool
   199  
   200  	// conntrackTCPLiberal indicates whether the system sets the kernel nf_conntrack_tcp_be_liberal
   201  	conntrackTCPLiberal bool
   202  
   203  	// nodePortAddresses selects the interfaces where nodePort works.
   204  	nodePortAddresses *proxyutil.NodePortAddresses
   205  	// networkInterfacer defines an interface for several net library functions.
   206  	// Inject for test purpose.
   207  	networkInterfacer proxyutil.NetworkInterfacer
   208  }
   209  
   210  // Proxier implements proxy.Provider
   211  var _ proxy.Provider = &Proxier{}
   212  
   213  // NewProxier returns a new Proxier given an iptables Interface instance.
   214  // Because of the iptables logic, it is assumed that there is only a single Proxier active on a machine.
   215  // An error will be returned if iptables fails to update or acquire the initial lock.
   216  // Once a proxier is created, it will keep iptables up to date in the background and
   217  // will not terminate if a particular iptables call fails.
   218  func NewProxier(ipFamily v1.IPFamily,
   219  	ipt utiliptables.Interface,
   220  	sysctl utilsysctl.Interface,
   221  	exec utilexec.Interface,
   222  	syncPeriod time.Duration,
   223  	minSyncPeriod time.Duration,
   224  	masqueradeAll bool,
   225  	localhostNodePorts bool,
   226  	masqueradeBit int,
   227  	localDetector proxyutiliptables.LocalTrafficDetector,
   228  	hostname string,
   229  	nodeIP net.IP,
   230  	recorder events.EventRecorder,
   231  	healthzServer *healthcheck.ProxierHealthServer,
   232  	nodePortAddressStrings []string,
   233  	initOnly bool,
   234  ) (*Proxier, error) {
   235  	nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings, nil)
   236  
   237  	if !nodePortAddresses.ContainsIPv4Loopback() {
   238  		localhostNodePorts = false
   239  	}
   240  	if localhostNodePorts {
   241  		// Set the route_localnet sysctl we need for exposing NodePorts on loopback addresses
   242  		// Refer to https://issues.k8s.io/90259
   243  		klog.InfoS("Setting route_localnet=1 to allow node-ports on localhost; to change this either disable iptables.localhostNodePorts (--iptables-localhost-nodeports) or set nodePortAddresses (--nodeport-addresses) to filter loopback addresses")
   244  		if err := proxyutil.EnsureSysctl(sysctl, sysctlRouteLocalnet, 1); err != nil {
   245  			return nil, err
   246  		}
   247  	}
   248  
   249  	// Be conservative in what you do, be liberal in what you accept from others.
   250  	// If it's non-zero, we mark only out of window RST segments as INVALID.
   251  	// Ref: https://docs.kernel.org/networking/nf_conntrack-sysctl.html
   252  	conntrackTCPLiberal := false
   253  	if val, err := sysctl.GetSysctl(sysctlNFConntrackTCPBeLiberal); err == nil && val != 0 {
   254  		conntrackTCPLiberal = true
   255  		klog.InfoS("nf_conntrack_tcp_be_liberal set, not installing DROP rules for INVALID packets")
   256  	}
   257  
   258  	if initOnly {
   259  		klog.InfoS("System initialized and --init-only specified")
   260  		return nil, nil
   261  	}
   262  
   263  	// Generate the masquerade mark to use for SNAT rules.
   264  	masqueradeValue := 1 << uint(masqueradeBit)
   265  	masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
   266  	klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)
   267  
   268  	serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
   269  
   270  	proxier := &Proxier{
   271  		ipFamily:                 ipFamily,
   272  		svcPortMap:               make(proxy.ServicePortMap),
   273  		serviceChanges:           proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
   274  		endpointsMap:             make(proxy.EndpointsMap),
   275  		endpointsChanges:         proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
   276  		needFullSync:             true,
   277  		syncPeriod:               syncPeriod,
   278  		iptables:                 ipt,
   279  		masqueradeAll:            masqueradeAll,
   280  		masqueradeMark:           masqueradeMark,
   281  		conntrack:                conntrack.NewExec(exec),
   282  		localDetector:            localDetector,
   283  		hostname:                 hostname,
   284  		nodeIP:                   nodeIP,
   285  		recorder:                 recorder,
   286  		serviceHealthServer:      serviceHealthServer,
   287  		healthzServer:            healthzServer,
   288  		precomputedProbabilities: make([]string, 0, 1001),
   289  		iptablesData:             bytes.NewBuffer(nil),
   290  		existingFilterChainsData: bytes.NewBuffer(nil),
   291  		filterChains:             proxyutil.NewLineBuffer(),
   292  		filterRules:              proxyutil.NewLineBuffer(),
   293  		natChains:                proxyutil.NewLineBuffer(),
   294  		natRules:                 proxyutil.NewLineBuffer(),
   295  		localhostNodePorts:       localhostNodePorts,
   296  		nodePortAddresses:        nodePortAddresses,
   297  		networkInterfacer:        proxyutil.RealNetwork{},
   298  		conntrackTCPLiberal:      conntrackTCPLiberal,
   299  	}
   300  
   301  	burstSyncs := 2
   302  	klog.V(2).InfoS("Iptables sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
   303  	// We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
   304  	// We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
   305  	// time.Hour is arbitrary.
   306  	proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
   307  
   308  	go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
   309  		proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop)
   310  
   311  	if ipt.HasRandomFully() {
   312  		klog.V(2).InfoS("Iptables supports --random-fully", "ipFamily", ipt.Protocol())
   313  	} else {
   314  		klog.V(2).InfoS("Iptables does not support --random-fully", "ipFamily", ipt.Protocol())
   315  	}
   316  
   317  	return proxier, nil
   318  }
   319  
   320  // internal struct for string service information
   321  type servicePortInfo struct {
   322  	*proxy.BaseServicePortInfo
   323  	// The following fields are computed and stored for performance reasons.
   324  	nameString             string
   325  	clusterPolicyChainName utiliptables.Chain
   326  	localPolicyChainName   utiliptables.Chain
   327  	firewallChainName      utiliptables.Chain
   328  	externalChainName      utiliptables.Chain
   329  }
   330  
   331  // returns a new proxy.ServicePort which abstracts a serviceInfo
   332  func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
   333  	svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo}
   334  
   335  	// Store the following for performance reasons.
   336  	svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
   337  	svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
   338  	protocol := strings.ToLower(string(svcPort.Protocol()))
   339  	svcPort.nameString = svcPortName.String()
   340  	svcPort.clusterPolicyChainName = servicePortPolicyClusterChain(svcPort.nameString, protocol)
   341  	svcPort.localPolicyChainName = servicePortPolicyLocalChainName(svcPort.nameString, protocol)
   342  	svcPort.firewallChainName = serviceFirewallChainName(svcPort.nameString, protocol)
   343  	svcPort.externalChainName = serviceExternalChainName(svcPort.nameString, protocol)
   344  
   345  	return svcPort
   346  }
   347  
   348  // internal struct for endpoints information
   349  type endpointInfo struct {
   350  	*proxy.BaseEndpointInfo
   351  
   352  	ChainName utiliptables.Chain
   353  }
   354  
   355  // returns a new proxy.Endpoint which abstracts a endpointInfo
   356  func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
   357  	return &endpointInfo{
   358  		BaseEndpointInfo: baseInfo,
   359  		ChainName:        servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()),
   360  	}
   361  }
   362  
   363  type iptablesJumpChain struct {
   364  	table     utiliptables.Table
   365  	dstChain  utiliptables.Chain
   366  	srcChain  utiliptables.Chain
   367  	comment   string
   368  	extraArgs []string
   369  }
   370  
   371  var iptablesJumpChains = []iptablesJumpChain{
   372  	{utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
   373  	{utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainForward, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
   374  	{utiliptables.TableFilter, kubeNodePortsChain, utiliptables.ChainInput, "kubernetes health check service ports", nil},
   375  	{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
   376  	{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
   377  	{utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil},
   378  	{utiliptables.TableFilter, kubeProxyFirewallChain, utiliptables.ChainInput, "kubernetes load balancer firewall", []string{"-m", "conntrack", "--ctstate", "NEW"}},
   379  	{utiliptables.TableFilter, kubeProxyFirewallChain, utiliptables.ChainOutput, "kubernetes load balancer firewall", []string{"-m", "conntrack", "--ctstate", "NEW"}},
   380  	{utiliptables.TableFilter, kubeProxyFirewallChain, utiliptables.ChainForward, "kubernetes load balancer firewall", []string{"-m", "conntrack", "--ctstate", "NEW"}},
   381  	{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil},
   382  	{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil},
   383  	{utiliptables.TableNAT, kubePostroutingChain, utiliptables.ChainPostrouting, "kubernetes postrouting rules", nil},
   384  }
   385  
   386  // Duplicates of chains created in pkg/kubelet/kubelet_network_linux.go; we create these
   387  // on startup but do not delete them in CleanupLeftovers.
   388  var iptablesKubeletJumpChains = []iptablesJumpChain{
   389  	{utiliptables.TableFilter, kubeletFirewallChain, utiliptables.ChainInput, "", nil},
   390  	{utiliptables.TableFilter, kubeletFirewallChain, utiliptables.ChainOutput, "", nil},
   391  }
   392  
   393  // When chains get removed from iptablesJumpChains, add them here so they get cleaned up
   394  // on upgrade.
   395  var iptablesCleanupOnlyChains = []iptablesJumpChain{}
   396  
   397  // CleanupLeftovers removes all iptables rules and chains created by the Proxier
   398  // It returns true if an error was encountered. Errors are logged.
   399  func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
   400  	// Unlink our chains
   401  	for _, jump := range append(iptablesJumpChains, iptablesCleanupOnlyChains...) {
   402  		args := append(jump.extraArgs,
   403  			"-m", "comment", "--comment", jump.comment,
   404  			"-j", string(jump.dstChain),
   405  		)
   406  		if err := ipt.DeleteRule(jump.table, jump.srcChain, args...); err != nil {
   407  			if !utiliptables.IsNotFoundError(err) {
   408  				klog.ErrorS(err, "Error removing pure-iptables proxy rule")
   409  				encounteredError = true
   410  			}
   411  		}
   412  	}
   413  
   414  	// Flush and remove all of our "-t nat" chains.
   415  	iptablesData := bytes.NewBuffer(nil)
   416  	if err := ipt.SaveInto(utiliptables.TableNAT, iptablesData); err != nil {
   417  		klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableNAT)
   418  		encounteredError = true
   419  	} else {
   420  		existingNATChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
   421  		natChains := proxyutil.NewLineBuffer()
   422  		natRules := proxyutil.NewLineBuffer()
   423  		natChains.Write("*nat")
   424  		// Start with chains we know we need to remove.
   425  		for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} {
   426  			if existingNATChains.Has(chain) {
   427  				chainString := string(chain)
   428  				natChains.Write(utiliptables.MakeChainLine(chain)) // flush
   429  				natRules.Write("-X", chainString)                  // delete
   430  			}
   431  		}
   432  		// Hunt for service and endpoint chains.
   433  		for chain := range existingNATChains {
   434  			chainString := string(chain)
   435  			if isServiceChainName(chainString) {
   436  				natChains.Write(utiliptables.MakeChainLine(chain)) // flush
   437  				natRules.Write("-X", chainString)                  // delete
   438  			}
   439  		}
   440  		natRules.Write("COMMIT")
   441  		natLines := append(natChains.Bytes(), natRules.Bytes()...)
   442  		// Write it.
   443  		err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
   444  		if err != nil {
   445  			klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableNAT)
   446  			metrics.IptablesRestoreFailuresTotal.Inc()
   447  			encounteredError = true
   448  		}
   449  	}
   450  
   451  	// Flush and remove all of our "-t filter" chains.
   452  	iptablesData.Reset()
   453  	if err := ipt.SaveInto(utiliptables.TableFilter, iptablesData); err != nil {
   454  		klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableFilter)
   455  		encounteredError = true
   456  	} else {
   457  		existingFilterChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
   458  		filterChains := proxyutil.NewLineBuffer()
   459  		filterRules := proxyutil.NewLineBuffer()
   460  		filterChains.Write("*filter")
   461  		for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
   462  			if existingFilterChains.Has(chain) {
   463  				chainString := string(chain)
   464  				filterChains.Write(utiliptables.MakeChainLine(chain))
   465  				filterRules.Write("-X", chainString)
   466  			}
   467  		}
   468  		filterRules.Write("COMMIT")
   469  		filterLines := append(filterChains.Bytes(), filterRules.Bytes()...)
   470  		// Write it.
   471  		if err := ipt.Restore(utiliptables.TableFilter, filterLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil {
   472  			klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableFilter)
   473  			metrics.IptablesRestoreFailuresTotal.Inc()
   474  			encounteredError = true
   475  		}
   476  	}
   477  	return encounteredError
   478  }
   479  
   480  func computeProbability(n int) string {
   481  	return fmt.Sprintf("%0.10f", 1.0/float64(n))
   482  }
   483  
   484  // This assumes proxier.mu is held
   485  func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) {
   486  	if len(proxier.precomputedProbabilities) == 0 {
   487  		proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "<bad value>")
   488  	}
   489  	for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ {
   490  		proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i))
   491  	}
   492  }
   493  
   494  // This assumes proxier.mu is held
   495  func (proxier *Proxier) probability(n int) string {
   496  	if n >= len(proxier.precomputedProbabilities) {
   497  		proxier.precomputeProbabilities(n)
   498  	}
   499  	return proxier.precomputedProbabilities[n]
   500  }
   501  
   502  // Sync is called to synchronize the proxier state to iptables as soon as possible.
   503  func (proxier *Proxier) Sync() {
   504  	if proxier.healthzServer != nil {
   505  		proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
   506  	}
   507  	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
   508  	proxier.syncRunner.Run()
   509  }
   510  
   511  // SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
   512  func (proxier *Proxier) SyncLoop() {
   513  	// Update healthz timestamp at beginning in case Sync() never succeeds.
   514  	if proxier.healthzServer != nil {
   515  		proxier.healthzServer.Updated(proxier.ipFamily)
   516  	}
   517  
   518  	// synthesize "last change queued" time as the informers are syncing.
   519  	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
   520  	proxier.syncRunner.Loop(wait.NeverStop)
   521  }
   522  
   523  func (proxier *Proxier) setInitialized(value bool) {
   524  	var initialized int32
   525  	if value {
   526  		initialized = 1
   527  	}
   528  	atomic.StoreInt32(&proxier.initialized, initialized)
   529  }
   530  
   531  func (proxier *Proxier) isInitialized() bool {
   532  	return atomic.LoadInt32(&proxier.initialized) > 0
   533  }
   534  
   535  // OnServiceAdd is called whenever creation of new service object
   536  // is observed.
   537  func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
   538  	proxier.OnServiceUpdate(nil, service)
   539  }
   540  
   541  // OnServiceUpdate is called whenever modification of an existing
   542  // service object is observed.
   543  func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
   544  	if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
   545  		proxier.Sync()
   546  	}
   547  }
   548  
   549  // OnServiceDelete is called whenever deletion of an existing service
   550  // object is observed.
   551  func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
   552  	proxier.OnServiceUpdate(service, nil)
   553  
   554  }
   555  
   556  // OnServiceSynced is called once all the initial event handlers were
   557  // called and the state is fully propagated to local cache.
   558  func (proxier *Proxier) OnServiceSynced() {
   559  	proxier.mu.Lock()
   560  	proxier.servicesSynced = true
   561  	proxier.setInitialized(proxier.endpointSlicesSynced)
   562  	proxier.mu.Unlock()
   563  
   564  	// Sync unconditionally - this is called once per lifetime.
   565  	proxier.syncProxyRules()
   566  }
   567  
   568  // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
   569  // is observed.
   570  func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
   571  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
   572  		proxier.Sync()
   573  	}
   574  }
   575  
   576  // OnEndpointSliceUpdate is called whenever modification of an existing endpoint
   577  // slice object is observed.
   578  func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
   579  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
   580  		proxier.Sync()
   581  	}
   582  }
   583  
   584  // OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
   585  // object is observed.
   586  func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
   587  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
   588  		proxier.Sync()
   589  	}
   590  }
   591  
   592  // OnEndpointSlicesSynced is called once all the initial event handlers were
   593  // called and the state is fully propagated to local cache.
   594  func (proxier *Proxier) OnEndpointSlicesSynced() {
   595  	proxier.mu.Lock()
   596  	proxier.endpointSlicesSynced = true
   597  	proxier.setInitialized(proxier.servicesSynced)
   598  	proxier.mu.Unlock()
   599  
   600  	// Sync unconditionally - this is called once per lifetime.
   601  	proxier.syncProxyRules()
   602  }
   603  
   604  // OnNodeAdd is called whenever creation of new node object
   605  // is observed.
   606  func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
   607  	if node.Name != proxier.hostname {
   608  		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
   609  			"eventNode", node.Name, "currentNode", proxier.hostname)
   610  		return
   611  	}
   612  
   613  	if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
   614  		return
   615  	}
   616  
   617  	proxier.mu.Lock()
   618  	proxier.nodeLabels = map[string]string{}
   619  	for k, v := range node.Labels {
   620  		proxier.nodeLabels[k] = v
   621  	}
   622  	proxier.needFullSync = true
   623  	proxier.mu.Unlock()
   624  	klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
   625  
   626  	proxier.Sync()
   627  }
   628  
   629  // OnNodeUpdate is called whenever modification of an existing
   630  // node object is observed.
   631  func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
   632  	if node.Name != proxier.hostname {
   633  		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
   634  			"eventNode", node.Name, "currentNode", proxier.hostname)
   635  		return
   636  	}
   637  
   638  	if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
   639  		return
   640  	}
   641  
   642  	proxier.mu.Lock()
   643  	proxier.nodeLabels = map[string]string{}
   644  	for k, v := range node.Labels {
   645  		proxier.nodeLabels[k] = v
   646  	}
   647  	proxier.needFullSync = true
   648  	proxier.mu.Unlock()
   649  	klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
   650  
   651  	proxier.Sync()
   652  }
   653  
   654  // OnNodeDelete is called whenever deletion of an existing node
   655  // object is observed.
   656  func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
   657  	if node.Name != proxier.hostname {
   658  		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
   659  			"eventNode", node.Name, "currentNode", proxier.hostname)
   660  		return
   661  	}
   662  
   663  	proxier.mu.Lock()
   664  	proxier.nodeLabels = nil
   665  	proxier.needFullSync = true
   666  	proxier.mu.Unlock()
   667  
   668  	proxier.Sync()
   669  }
   670  
   671  // OnNodeSynced is called once all the initial event handlers were
   672  // called and the state is fully propagated to local cache.
   673  func (proxier *Proxier) OnNodeSynced() {
   674  }
   675  
   676  // OnServiceCIDRsChanged is called whenever a change is observed
   677  // in any of the ServiceCIDRs, and provides complete list of service cidrs.
   678  func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
   679  
   680  // portProtoHash takes the ServicePortName and protocol for a service
   681  // returns the associated 16 character hash. This is computed by hashing (sha256)
   682  // then encoding to base32 and truncating to 16 chars. We do this because IPTables
   683  // Chain Names must be <= 28 chars long, and the longer they are the harder they are to read.
   684  func portProtoHash(servicePortName string, protocol string) string {
   685  	hash := sha256.Sum256([]byte(servicePortName + protocol))
   686  	encoded := base32.StdEncoding.EncodeToString(hash[:])
   687  	return encoded[:16]
   688  }
   689  
   690  const (
   691  	servicePortPolicyClusterChainNamePrefix = "KUBE-SVC-"
   692  	servicePortPolicyLocalChainNamePrefix   = "KUBE-SVL-"
   693  	serviceFirewallChainNamePrefix          = "KUBE-FW-"
   694  	serviceExternalChainNamePrefix          = "KUBE-EXT-"
   695  	servicePortEndpointChainNamePrefix      = "KUBE-SEP-"
   696  )
   697  
   698  // servicePortPolicyClusterChain returns the name of the KUBE-SVC-XXXX chain for a service, which is the
   699  // main iptables chain for that service, used for dispatching to endpoints when using `Cluster`
   700  // traffic policy.
   701  func servicePortPolicyClusterChain(servicePortName string, protocol string) utiliptables.Chain {
   702  	return utiliptables.Chain(servicePortPolicyClusterChainNamePrefix + portProtoHash(servicePortName, protocol))
   703  }
   704  
   705  // servicePortPolicyLocalChainName returns the name of the KUBE-SVL-XXXX chain for a service, which
   706  // handles dispatching to local endpoints when using `Local` traffic policy. This chain only
   707  // exists if the service has `Local` internal or external traffic policy.
   708  func servicePortPolicyLocalChainName(servicePortName string, protocol string) utiliptables.Chain {
   709  	return utiliptables.Chain(servicePortPolicyLocalChainNamePrefix + portProtoHash(servicePortName, protocol))
   710  }
   711  
   712  // serviceFirewallChainName returns the name of the KUBE-FW-XXXX chain for a service, which
   713  // is used to implement the filtering for the LoadBalancerSourceRanges feature.
   714  func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain {
   715  	return utiliptables.Chain(serviceFirewallChainNamePrefix + portProtoHash(servicePortName, protocol))
   716  }
   717  
   718  // serviceExternalChainName returns the name of the KUBE-EXT-XXXX chain for a service, which
   719  // implements "short-circuiting" for internally-originated external-destination traffic when using
   720  // `Local` external traffic policy.  It forwards traffic from local sources to the KUBE-SVC-XXXX
   721  // chain and traffic from external sources to the KUBE-SVL-XXXX chain.
   722  func serviceExternalChainName(servicePortName string, protocol string) utiliptables.Chain {
   723  	return utiliptables.Chain(serviceExternalChainNamePrefix + portProtoHash(servicePortName, protocol))
   724  }
   725  
   726  // servicePortEndpointChainName returns the name of the KUBE-SEP-XXXX chain for a particular
   727  // service endpoint.
   728  func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) utiliptables.Chain {
   729  	hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint))
   730  	encoded := base32.StdEncoding.EncodeToString(hash[:])
   731  	return utiliptables.Chain(servicePortEndpointChainNamePrefix + encoded[:16])
   732  }
   733  
   734  func isServiceChainName(chainString string) bool {
   735  	prefixes := []string{
   736  		servicePortPolicyClusterChainNamePrefix,
   737  		servicePortPolicyLocalChainNamePrefix,
   738  		servicePortEndpointChainNamePrefix,
   739  		serviceFirewallChainNamePrefix,
   740  		serviceExternalChainNamePrefix,
   741  	}
   742  
   743  	for _, p := range prefixes {
   744  		if strings.HasPrefix(chainString, p) {
   745  			return true
   746  		}
   747  	}
   748  	return false
   749  }
   750  
   751  // Assumes proxier.mu is held.
   752  func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string) []string {
   753  	// Not printing these comments, can reduce size of iptables (in case of large
   754  	// number of endpoints) even by 40%+. So if total number of endpoint chains
   755  	// is large enough, we simply drop those comments.
   756  	if proxier.largeClusterMode {
   757  		return args
   758  	}
   759  	return append(args, "-m", "comment", "--comment", svcName)
   760  }
   761  
   762  // Called by the iptables.Monitor, and in response to topology changes; this calls
   763  // syncProxyRules() and tells it to resync all services, regardless of whether the
   764  // Service or Endpoints/EndpointSlice objects themselves have changed
   765  func (proxier *Proxier) forceSyncProxyRules() {
   766  	proxier.mu.Lock()
   767  	proxier.needFullSync = true
   768  	proxier.mu.Unlock()
   769  
   770  	proxier.syncProxyRules()
   771  }
   772  
   773  // This is where all of the iptables-save/restore calls happen.
   774  // The only other iptables rules are those that are setup in iptablesInit()
   775  // This assumes proxier.mu is NOT held
   776  func (proxier *Proxier) syncProxyRules() {
   777  	proxier.mu.Lock()
   778  	defer proxier.mu.Unlock()
   779  
   780  	// don't sync rules till we've received services and endpoints
   781  	if !proxier.isInitialized() {
   782  		klog.V(2).InfoS("Not syncing iptables until Services and Endpoints have been received from master")
   783  		return
   784  	}
   785  
   786  	// The value of proxier.needFullSync may change before the defer funcs run, so
   787  	// we need to keep track of whether it was set at the *start* of the sync.
   788  	tryPartialSync := !proxier.needFullSync
   789  
   790  	// Keep track of how long syncs take.
   791  	start := time.Now()
   792  	defer func() {
   793  		metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
   794  		if tryPartialSync {
   795  			metrics.SyncPartialProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
   796  		} else {
   797  			metrics.SyncFullProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
   798  		}
   799  		klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
   800  	}()
   801  
   802  	serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
   803  	endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
   804  
   805  	klog.V(2).InfoS("Syncing iptables rules")
   806  
   807  	success := false
   808  	defer func() {
   809  		if !success {
   810  			klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
   811  			proxier.syncRunner.RetryAfter(proxier.syncPeriod)
   812  			if tryPartialSync {
   813  				metrics.IptablesPartialRestoreFailuresTotal.Inc()
   814  			}
   815  			// proxier.serviceChanges and proxier.endpointChanges have already
   816  			// been flushed, so we've lost the state needed to be able to do
   817  			// a partial sync.
   818  			proxier.needFullSync = true
   819  		}
   820  	}()
   821  
   822  	if !tryPartialSync {
   823  		// Ensure that our jump rules (eg from PREROUTING to KUBE-SERVICES) exist.
   824  		// We can't do this as part of the iptables-restore because we don't want
   825  		// to specify/replace *all* of the rules in PREROUTING, etc.
   826  		//
   827  		// We need to create these rules when kube-proxy first starts, and we need
   828  		// to recreate them if the utiliptables Monitor detects that iptables has
   829  		// been flushed. In both of those cases, the code will force a full sync.
   830  		// In all other cases, it ought to be safe to assume that the rules
   831  		// already exist, so we'll skip this step when doing a partial sync, to
   832  		// save us from having to invoke /sbin/iptables 20 times on each sync
   833  		// (which will be very slow on hosts with lots of iptables rules).
   834  		for _, jump := range append(iptablesJumpChains, iptablesKubeletJumpChains...) {
   835  			if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
   836  				klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
   837  				return
   838  			}
   839  			args := jump.extraArgs
   840  			if jump.comment != "" {
   841  				args = append(args, "-m", "comment", "--comment", jump.comment)
   842  			}
   843  			args = append(args, "-j", string(jump.dstChain))
   844  			if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
   845  				klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
   846  				return
   847  			}
   848  		}
   849  	}
   850  
   851  	//
   852  	// Below this point we will not return until we try to write the iptables rules.
   853  	//
   854  
   855  	// Reset all buffers used later.
   856  	// This is to avoid memory reallocations and thus improve performance.
   857  	proxier.filterChains.Reset()
   858  	proxier.filterRules.Reset()
   859  	proxier.natChains.Reset()
   860  	proxier.natRules.Reset()
   861  
   862  	skippedNatChains := proxyutil.NewDiscardLineBuffer()
   863  	skippedNatRules := proxyutil.NewDiscardLineBuffer()
   864  
   865  	// Write chain lines for all the "top-level" chains we'll be filling in
   866  	for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain, kubeProxyFirewallChain} {
   867  		proxier.filterChains.Write(utiliptables.MakeChainLine(chainName))
   868  	}
   869  	for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, kubeMarkMasqChain} {
   870  		proxier.natChains.Write(utiliptables.MakeChainLine(chainName))
   871  	}
   872  
   873  	// Install the kubernetes-specific postrouting rules. We use a whole chain for
   874  	// this so that it is easier to flush and change, for example if the mark
   875  	// value should ever change.
   876  
   877  	proxier.natRules.Write(
   878  		"-A", string(kubePostroutingChain),
   879  		"-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
   880  		"-j", "RETURN",
   881  	)
   882  	// Clear the mark to avoid re-masquerading if the packet re-traverses the network stack.
   883  	proxier.natRules.Write(
   884  		"-A", string(kubePostroutingChain),
   885  		"-j", "MARK", "--xor-mark", proxier.masqueradeMark,
   886  	)
   887  	masqRule := []string{
   888  		"-A", string(kubePostroutingChain),
   889  		"-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
   890  		"-j", "MASQUERADE",
   891  	}
   892  	if proxier.iptables.HasRandomFully() {
   893  		masqRule = append(masqRule, "--random-fully")
   894  	}
   895  	proxier.natRules.Write(masqRule)
   896  
   897  	// Install the kubernetes-specific masquerade mark rule. We use a whole chain for
   898  	// this so that it is easier to flush and change, for example if the mark
   899  	// value should ever change.
   900  	proxier.natRules.Write(
   901  		"-A", string(kubeMarkMasqChain),
   902  		"-j", "MARK", "--or-mark", proxier.masqueradeMark,
   903  	)
   904  
   905  	isIPv6 := proxier.iptables.IsIPv6()
   906  	if !isIPv6 && proxier.localhostNodePorts {
   907  		// Kube-proxy's use of `route_localnet` to enable NodePorts on localhost
   908  		// creates a security hole (https://issue.k8s.io/90259) which this
   909  		// iptables rule mitigates.
   910  
   911  		// NOTE: kubelet creates an identical copy of this rule. If you want to
   912  		// change this rule in the future, you MUST do so in a way that will
   913  		// interoperate correctly with skewed versions of the rule created by
   914  		// kubelet. (Actually, kubelet uses "--dst"/"--src" rather than "-d"/"-s"
   915  		// but that's just a command-line thing and results in the same rule being
   916  		// created in the kernel.)
   917  		proxier.filterChains.Write(utiliptables.MakeChainLine(kubeletFirewallChain))
   918  		proxier.filterRules.Write(
   919  			"-A", string(kubeletFirewallChain),
   920  			"-m", "comment", "--comment", `"block incoming localnet connections"`,
   921  			"-d", "127.0.0.0/8",
   922  			"!", "-s", "127.0.0.0/8",
   923  			"-m", "conntrack",
   924  			"!", "--ctstate", "RELATED,ESTABLISHED,DNAT",
   925  			"-j", "DROP",
   926  		)
   927  	}
   928  
   929  	// Accumulate NAT chains to keep.
   930  	activeNATChains := sets.New[utiliptables.Chain]()
   931  
   932  	// To avoid growing this slice, we arbitrarily set its size to 64,
   933  	// there is never more than that many arguments for a single line.
   934  	// Note that even if we go over 64, it will still be correct - it
   935  	// is just for efficiency, not correctness.
   936  	args := make([]string, 64)
   937  
   938  	// Compute total number of endpoint chains across all services
   939  	// to get a sense of how big the cluster is.
   940  	totalEndpoints := 0
   941  	for svcName := range proxier.svcPortMap {
   942  		totalEndpoints += len(proxier.endpointsMap[svcName])
   943  	}
   944  	proxier.largeClusterMode = (totalEndpoints > largeClusterEndpointsThreshold)
   945  
   946  	// These two variables are used to publish the sync_proxy_rules_no_endpoints_total
   947  	// metric.
   948  	serviceNoLocalEndpointsTotalInternal := 0
   949  	serviceNoLocalEndpointsTotalExternal := 0
   950  
   951  	// Build rules for each service-port.
   952  	for svcName, svc := range proxier.svcPortMap {
   953  		svcInfo, ok := svc.(*servicePortInfo)
   954  		if !ok {
   955  			klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
   956  			continue
   957  		}
   958  		protocol := strings.ToLower(string(svcInfo.Protocol()))
   959  		svcPortNameString := svcInfo.nameString
   960  
   961  		// Figure out the endpoints for Cluster and Local traffic policy.
   962  		// allLocallyReachableEndpoints is the set of all endpoints that can be routed to
   963  		// from this node, given the service's traffic policies. hasEndpoints is true
   964  		// if the service has any usable endpoints on any node, not just this one.
   965  		allEndpoints := proxier.endpointsMap[svcName]
   966  		clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
   967  
   968  		// clusterPolicyChain contains the endpoints used with "Cluster" traffic policy
   969  		clusterPolicyChain := svcInfo.clusterPolicyChainName
   970  		usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
   971  
   972  		// localPolicyChain contains the endpoints used with "Local" traffic policy
   973  		localPolicyChain := svcInfo.localPolicyChainName
   974  		usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()
   975  
   976  		// internalPolicyChain is the chain containing the endpoints for
   977  		// "internal" (ClusterIP) traffic. internalTrafficChain is the chain that
   978  		// internal traffic is routed to (which is always the same as
   979  		// internalPolicyChain). hasInternalEndpoints is true if we should
   980  		// generate rules pointing to internalTrafficChain, or false if there are
   981  		// no available internal endpoints.
   982  		internalPolicyChain := clusterPolicyChain
   983  		hasInternalEndpoints := hasEndpoints
   984  		if svcInfo.InternalPolicyLocal() {
   985  			internalPolicyChain = localPolicyChain
   986  			if len(localEndpoints) == 0 {
   987  				hasInternalEndpoints = false
   988  			}
   989  		}
   990  		internalTrafficChain := internalPolicyChain
   991  
   992  		// Similarly, externalPolicyChain is the chain containing the endpoints
   993  		// for "external" (NodePort, LoadBalancer, and ExternalIP) traffic.
   994  		// externalTrafficChain is the chain that external traffic is routed to
   995  		// (which is always the service's "EXT" chain). hasExternalEndpoints is
   996  		// true if there are endpoints that will be reached by external traffic.
   997  		// (But we may still have to generate externalTrafficChain even if there
   998  		// are no external endpoints, to ensure that the short-circuit rules for
   999  		// local traffic are set up.)
  1000  		externalPolicyChain := clusterPolicyChain
  1001  		hasExternalEndpoints := hasEndpoints
  1002  		if svcInfo.ExternalPolicyLocal() {
  1003  			externalPolicyChain = localPolicyChain
  1004  			if len(localEndpoints) == 0 {
  1005  				hasExternalEndpoints = false
  1006  			}
  1007  		}
  1008  		externalTrafficChain := svcInfo.externalChainName // eventually jumps to externalPolicyChain
  1009  
  1010  		// usesExternalTrafficChain is based on hasEndpoints, not hasExternalEndpoints,
  1011  		// because we need the local-traffic-short-circuiting rules even when there
  1012  		// are no externally-usable endpoints.
  1013  		usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()
  1014  
  1015  		// Traffic to LoadBalancer IPs can go directly to externalTrafficChain
  1016  		// unless LoadBalancerSourceRanges is in use in which case we will
  1017  		// create a firewall chain.
  1018  		loadBalancerTrafficChain := externalTrafficChain
  1019  		fwChain := svcInfo.firewallChainName
  1020  		usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPs()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
  1021  		if usesFWChain {
  1022  			loadBalancerTrafficChain = fwChain
  1023  		}
  1024  
  1025  		var internalTrafficFilterTarget, internalTrafficFilterComment string
  1026  		var externalTrafficFilterTarget, externalTrafficFilterComment string
  1027  		if !hasEndpoints {
  1028  			// The service has no endpoints at all; hasInternalEndpoints and
  1029  			// hasExternalEndpoints will also be false, and we will not
  1030  			// generate any chains in the "nat" table for the service; only
  1031  			// rules in the "filter" table rejecting incoming packets for
  1032  			// the service's IPs.
  1033  			internalTrafficFilterTarget = "REJECT"
  1034  			internalTrafficFilterComment = fmt.Sprintf(`"%s has no endpoints"`, svcPortNameString)
  1035  			externalTrafficFilterTarget = "REJECT"
  1036  			externalTrafficFilterComment = internalTrafficFilterComment
  1037  		} else {
  1038  			if !hasInternalEndpoints {
  1039  				// The internalTrafficPolicy is "Local" but there are no local
  1040  				// endpoints. Traffic to the clusterIP will be dropped, but
  1041  				// external traffic may still be accepted.
  1042  				internalTrafficFilterTarget = "DROP"
  1043  				internalTrafficFilterComment = fmt.Sprintf(`"%s has no local endpoints"`, svcPortNameString)
  1044  				serviceNoLocalEndpointsTotalInternal++
  1045  			}
  1046  			if !hasExternalEndpoints {
  1047  				// The externalTrafficPolicy is "Local" but there are no
  1048  				// local endpoints. Traffic to "external" IPs from outside
  1049  				// the cluster will be dropped, but traffic from inside
  1050  				// the cluster may still be accepted.
  1051  				externalTrafficFilterTarget = "DROP"
  1052  				externalTrafficFilterComment = fmt.Sprintf(`"%s has no local endpoints"`, svcPortNameString)
  1053  				serviceNoLocalEndpointsTotalExternal++
  1054  			}
  1055  		}
  1056  
  1057  		filterRules := proxier.filterRules
  1058  		natChains := proxier.natChains
  1059  		natRules := proxier.natRules
  1060  
  1061  		// Capture the clusterIP.
  1062  		if hasInternalEndpoints {
  1063  			natRules.Write(
  1064  				"-A", string(kubeServicesChain),
  1065  				"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
  1066  				"-m", protocol, "-p", protocol,
  1067  				"-d", svcInfo.ClusterIP().String(),
  1068  				"--dport", strconv.Itoa(svcInfo.Port()),
  1069  				"-j", string(internalTrafficChain))
  1070  		} else {
  1071  			// No endpoints.
  1072  			filterRules.Write(
  1073  				"-A", string(kubeServicesChain),
  1074  				"-m", "comment", "--comment", internalTrafficFilterComment,
  1075  				"-m", protocol, "-p", protocol,
  1076  				"-d", svcInfo.ClusterIP().String(),
  1077  				"--dport", strconv.Itoa(svcInfo.Port()),
  1078  				"-j", internalTrafficFilterTarget,
  1079  			)
  1080  		}
  1081  
  1082  		// Capture externalIPs.
  1083  		for _, externalIP := range svcInfo.ExternalIPs() {
  1084  			if hasEndpoints {
  1085  				// Send traffic bound for external IPs to the "external
  1086  				// destinations" chain.
  1087  				natRules.Write(
  1088  					"-A", string(kubeServicesChain),
  1089  					"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcPortNameString),
  1090  					"-m", protocol, "-p", protocol,
  1091  					"-d", externalIP.String(),
  1092  					"--dport", strconv.Itoa(svcInfo.Port()),
  1093  					"-j", string(externalTrafficChain))
  1094  			}
  1095  			if !hasExternalEndpoints {
  1096  				// Either no endpoints at all (REJECT) or no endpoints for
  1097  				// external traffic (DROP anything that didn't get
  1098  				// short-circuited by the EXT chain.)
  1099  				filterRules.Write(
  1100  					"-A", string(kubeExternalServicesChain),
  1101  					"-m", "comment", "--comment", externalTrafficFilterComment,
  1102  					"-m", protocol, "-p", protocol,
  1103  					"-d", externalIP.String(),
  1104  					"--dport", strconv.Itoa(svcInfo.Port()),
  1105  					"-j", externalTrafficFilterTarget,
  1106  				)
  1107  			}
  1108  		}
  1109  
  1110  		// Capture load-balancer ingress.
  1111  		for _, lbip := range svcInfo.LoadBalancerVIPs() {
  1112  			if hasEndpoints {
  1113  				natRules.Write(
  1114  					"-A", string(kubeServicesChain),
  1115  					"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
  1116  					"-m", protocol, "-p", protocol,
  1117  					"-d", lbip.String(),
  1118  					"--dport", strconv.Itoa(svcInfo.Port()),
  1119  					"-j", string(loadBalancerTrafficChain))
  1120  
  1121  			}
  1122  			if usesFWChain {
  1123  				filterRules.Write(
  1124  					"-A", string(kubeProxyFirewallChain),
  1125  					"-m", "comment", "--comment", fmt.Sprintf(`"%s traffic not accepted by %s"`, svcPortNameString, svcInfo.firewallChainName),
  1126  					"-m", protocol, "-p", protocol,
  1127  					"-d", lbip.String(),
  1128  					"--dport", strconv.Itoa(svcInfo.Port()),
  1129  					"-j", "DROP")
  1130  			}
  1131  		}
  1132  		if !hasExternalEndpoints {
  1133  			// Either no endpoints at all (REJECT) or no endpoints for
  1134  			// external traffic (DROP anything that didn't get short-circuited
  1135  			// by the EXT chain.)
  1136  			for _, lbip := range svcInfo.LoadBalancerVIPs() {
  1137  				filterRules.Write(
  1138  					"-A", string(kubeExternalServicesChain),
  1139  					"-m", "comment", "--comment", externalTrafficFilterComment,
  1140  					"-m", protocol, "-p", protocol,
  1141  					"-d", lbip.String(),
  1142  					"--dport", strconv.Itoa(svcInfo.Port()),
  1143  					"-j", externalTrafficFilterTarget,
  1144  				)
  1145  			}
  1146  		}
  1147  
  1148  		// Capture nodeports.
  1149  		if svcInfo.NodePort() != 0 {
  1150  			if hasEndpoints {
  1151  				// Jump to the external destination chain.  For better or for
  1152  				// worse, nodeports are not subect to loadBalancerSourceRanges,
  1153  				// and we can't change that.
  1154  				natRules.Write(
  1155  					"-A", string(kubeNodePortsChain),
  1156  					"-m", "comment", "--comment", svcPortNameString,
  1157  					"-m", protocol, "-p", protocol,
  1158  					"--dport", strconv.Itoa(svcInfo.NodePort()),
  1159  					"-j", string(externalTrafficChain))
  1160  			}
  1161  			if !hasExternalEndpoints {
  1162  				// Either no endpoints at all (REJECT) or no endpoints for
  1163  				// external traffic (DROP anything that didn't get
  1164  				// short-circuited by the EXT chain.)
  1165  				filterRules.Write(
  1166  					"-A", string(kubeExternalServicesChain),
  1167  					"-m", "comment", "--comment", externalTrafficFilterComment,
  1168  					"-m", "addrtype", "--dst-type", "LOCAL",
  1169  					"-m", protocol, "-p", protocol,
  1170  					"--dport", strconv.Itoa(svcInfo.NodePort()),
  1171  					"-j", externalTrafficFilterTarget,
  1172  				)
  1173  			}
  1174  		}
  1175  
  1176  		// Capture healthCheckNodePorts.
  1177  		if svcInfo.HealthCheckNodePort() != 0 {
  1178  			// no matter if node has local endpoints, healthCheckNodePorts
  1179  			// need to add a rule to accept the incoming connection
  1180  			filterRules.Write(
  1181  				"-A", string(kubeNodePortsChain),
  1182  				"-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcPortNameString),
  1183  				"-m", "tcp", "-p", "tcp",
  1184  				"--dport", strconv.Itoa(svcInfo.HealthCheckNodePort()),
  1185  				"-j", "ACCEPT",
  1186  			)
  1187  		}
  1188  
  1189  		// If the SVC/SVL/EXT/FW/SEP chains have not changed since the last sync
  1190  		// then we can omit them from the restore input. However, we have to still
  1191  		// figure out how many chains we _would_ have written, to make the metrics
  1192  		// come out right, so we just compute them and throw them away.
  1193  		if tryPartialSync && !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) {
  1194  			natChains = skippedNatChains
  1195  			natRules = skippedNatRules
  1196  		}
  1197  
  1198  		// Set up internal traffic handling.
  1199  		if hasInternalEndpoints {
  1200  			args = append(args[:0],
  1201  				"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
  1202  				"-m", protocol, "-p", protocol,
  1203  				"-d", svcInfo.ClusterIP().String(),
  1204  				"--dport", strconv.Itoa(svcInfo.Port()),
  1205  			)
  1206  			if proxier.masqueradeAll {
  1207  				natRules.Write(
  1208  					"-A", string(internalTrafficChain),
  1209  					args,
  1210  					"-j", string(kubeMarkMasqChain))
  1211  			} else if proxier.localDetector.IsImplemented() {
  1212  				// This masquerades off-cluster traffic to a service VIP. The
  1213  				// idea is that you can establish a static route for your
  1214  				// Service range, routing to any node, and that node will
  1215  				// bridge into the Service for you. Since that might bounce
  1216  				// off-node, we masquerade here.
  1217  				natRules.Write(
  1218  					"-A", string(internalTrafficChain),
  1219  					args,
  1220  					proxier.localDetector.IfNotLocal(),
  1221  					"-j", string(kubeMarkMasqChain))
  1222  			}
  1223  		}
  1224  
  1225  		// Set up external traffic handling (if any "external" destinations are
  1226  		// enabled). All captured traffic for all external destinations should
  1227  		// jump to externalTrafficChain, which will handle some special cases and
  1228  		// then jump to externalPolicyChain.
  1229  		if usesExternalTrafficChain {
  1230  			natChains.Write(utiliptables.MakeChainLine(externalTrafficChain))
  1231  			activeNATChains.Insert(externalTrafficChain)
  1232  
  1233  			if !svcInfo.ExternalPolicyLocal() {
  1234  				// If we are using non-local endpoints we need to masquerade,
  1235  				// in case we cross nodes.
  1236  				natRules.Write(
  1237  					"-A", string(externalTrafficChain),
  1238  					"-m", "comment", "--comment", fmt.Sprintf(`"masquerade traffic for %s external destinations"`, svcPortNameString),
  1239  					"-j", string(kubeMarkMasqChain))
  1240  			} else {
  1241  				// If we are only using same-node endpoints, we can retain the
  1242  				// source IP in most cases.
  1243  
  1244  				if proxier.localDetector.IsImplemented() {
  1245  					// Treat all locally-originated pod -> external destination
  1246  					// traffic as a special-case.  It is subject to neither
  1247  					// form of traffic policy, which simulates going up-and-out
  1248  					// to an external load-balancer and coming back in.
  1249  					natRules.Write(
  1250  						"-A", string(externalTrafficChain),
  1251  						"-m", "comment", "--comment", fmt.Sprintf(`"pod traffic for %s external destinations"`, svcPortNameString),
  1252  						proxier.localDetector.IfLocal(),
  1253  						"-j", string(clusterPolicyChain))
  1254  				}
  1255  
  1256  				// Locally originated traffic (not a pod, but the host node)
  1257  				// still needs masquerade because the LBIP itself is a local
  1258  				// address, so that will be the chosen source IP.
  1259  				natRules.Write(
  1260  					"-A", string(externalTrafficChain),
  1261  					"-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s external destinations"`, svcPortNameString),
  1262  					"-m", "addrtype", "--src-type", "LOCAL",
  1263  					"-j", string(kubeMarkMasqChain))
  1264  
  1265  				// Redirect all src-type=LOCAL -> external destination to the
  1266  				// policy=cluster chain. This allows traffic originating
  1267  				// from the host to be redirected to the service correctly.
  1268  				natRules.Write(
  1269  					"-A", string(externalTrafficChain),
  1270  					"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s external destinations"`, svcPortNameString),
  1271  					"-m", "addrtype", "--src-type", "LOCAL",
  1272  					"-j", string(clusterPolicyChain))
  1273  			}
  1274  
  1275  			// Anything else falls thru to the appropriate policy chain.
  1276  			if hasExternalEndpoints {
  1277  				natRules.Write(
  1278  					"-A", string(externalTrafficChain),
  1279  					"-j", string(externalPolicyChain))
  1280  			}
  1281  		}
  1282  
  1283  		// Set up firewall chain, if needed
  1284  		if usesFWChain {
  1285  			natChains.Write(utiliptables.MakeChainLine(fwChain))
  1286  			activeNATChains.Insert(fwChain)
  1287  
  1288  			// The service firewall rules are created based on the
  1289  			// loadBalancerSourceRanges field. This only works for VIP-like
  1290  			// loadbalancers that preserve source IPs. For loadbalancers which
  1291  			// direct traffic to service NodePort, the firewall rules will not
  1292  			// apply.
  1293  			args = append(args[:0],
  1294  				"-A", string(fwChain),
  1295  				"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
  1296  			)
  1297  
  1298  			// firewall filter based on each source range
  1299  			allowFromNode := false
  1300  			for _, cidr := range svcInfo.LoadBalancerSourceRanges() {
  1301  				natRules.Write(args, "-s", cidr.String(), "-j", string(externalTrafficChain))
  1302  				if cidr.Contains(proxier.nodeIP) {
  1303  					allowFromNode = true
  1304  				}
  1305  			}
  1306  			// For VIP-like LBs, the VIP is often added as a local
  1307  			// address (via an IP route rule).  In that case, a request
  1308  			// from a node to the VIP will not hit the loadbalancer but
  1309  			// will loop back with the source IP set to the VIP.  We
  1310  			// need the following rules to allow requests from this node.
  1311  			if allowFromNode {
  1312  				for _, lbip := range svcInfo.LoadBalancerVIPs() {
  1313  					natRules.Write(
  1314  						args,
  1315  						"-s", lbip.String(),
  1316  						"-j", string(externalTrafficChain))
  1317  				}
  1318  			}
  1319  			// If the packet was able to reach the end of firewall chain,
  1320  			// then it did not get DNATed, so it will match the
  1321  			// corresponding KUBE-PROXY-FIREWALL rule.
  1322  			natRules.Write(
  1323  				"-A", string(fwChain),
  1324  				"-m", "comment", "--comment", fmt.Sprintf(`"other traffic to %s will be dropped by KUBE-PROXY-FIREWALL"`, svcPortNameString),
  1325  			)
  1326  		}
  1327  
  1328  		// If Cluster policy is in use, create the chain and create rules jumping
  1329  		// from clusterPolicyChain to the clusterEndpoints
  1330  		if usesClusterPolicyChain {
  1331  			natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain))
  1332  			activeNATChains.Insert(clusterPolicyChain)
  1333  			proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args)
  1334  		}
  1335  
  1336  		// If Local policy is in use, create the chain and create rules jumping
  1337  		// from localPolicyChain to the localEndpoints
  1338  		if usesLocalPolicyChain {
  1339  			natChains.Write(utiliptables.MakeChainLine(localPolicyChain))
  1340  			activeNATChains.Insert(localPolicyChain)
  1341  			proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, localPolicyChain, localEndpoints, args)
  1342  		}
  1343  
  1344  		// Generate the per-endpoint chains.
  1345  		for _, ep := range allLocallyReachableEndpoints {
  1346  			epInfo, ok := ep.(*endpointInfo)
  1347  			if !ok {
  1348  				klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
  1349  				continue
  1350  			}
  1351  
  1352  			endpointChain := epInfo.ChainName
  1353  
  1354  			// Create the endpoint chain
  1355  			natChains.Write(utiliptables.MakeChainLine(endpointChain))
  1356  			activeNATChains.Insert(endpointChain)
  1357  
  1358  			args = append(args[:0], "-A", string(endpointChain))
  1359  			args = proxier.appendServiceCommentLocked(args, svcPortNameString)
  1360  			// Handle traffic that loops back to the originator with SNAT.
  1361  			natRules.Write(
  1362  				args,
  1363  				"-s", epInfo.IP(),
  1364  				"-j", string(kubeMarkMasqChain))
  1365  			// Update client-affinity lists.
  1366  			if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1367  				args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
  1368  			}
  1369  			// DNAT to final destination.
  1370  			args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.String())
  1371  			natRules.Write(args)
  1372  		}
  1373  	}
  1374  
  1375  	// Delete chains no longer in use. Since "iptables-save" can take several seconds
  1376  	// to run on hosts with lots of iptables rules, we don't bother to do this on
  1377  	// every sync in large clusters. (Stale chains will not be referenced by any
  1378  	// active rules, so they're harmless other than taking up memory.)
  1379  	deletedChains := 0
  1380  	if !proxier.largeClusterMode || time.Since(proxier.lastIPTablesCleanup) > proxier.syncPeriod {
  1381  		proxier.iptablesData.Reset()
  1382  		if err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData); err == nil {
  1383  			existingNATChains := utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes())
  1384  			for chain := range existingNATChains.Difference(activeNATChains) {
  1385  				chainString := string(chain)
  1386  				if !isServiceChainName(chainString) {
  1387  					// Ignore chains that aren't ours.
  1388  					continue
  1389  				}
  1390  				// We must (as per iptables) write a chain-line
  1391  				// for it, which has the nice effect of flushing
  1392  				// the chain. Then we can remove the chain.
  1393  				proxier.natChains.Write(utiliptables.MakeChainLine(chain))
  1394  				proxier.natRules.Write("-X", chainString)
  1395  				deletedChains++
  1396  			}
  1397  			proxier.lastIPTablesCleanup = time.Now()
  1398  		} else {
  1399  			klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted")
  1400  		}
  1401  	}
  1402  
  1403  	// Finally, tail-call to the nodePorts chain.  This needs to be after all
  1404  	// other service portal rules.
  1405  	if proxier.nodePortAddresses.MatchAll() {
  1406  		destinations := []string{"-m", "addrtype", "--dst-type", "LOCAL"}
  1407  		// Block localhost nodePorts if they are not supported. (For IPv6 they never
  1408  		// work, and for IPv4 they only work if we previously set `route_localnet`.)
  1409  		if isIPv6 {
  1410  			destinations = append(destinations, "!", "-d", "::1/128")
  1411  		} else if !proxier.localhostNodePorts {
  1412  			destinations = append(destinations, "!", "-d", "127.0.0.0/8")
  1413  		}
  1414  
  1415  		proxier.natRules.Write(
  1416  			"-A", string(kubeServicesChain),
  1417  			"-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
  1418  			destinations,
  1419  			"-j", string(kubeNodePortsChain))
  1420  	} else {
  1421  		nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
  1422  		if err != nil {
  1423  			klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
  1424  		}
  1425  		for _, ip := range nodeIPs {
  1426  			if ip.IsLoopback() {
  1427  				if isIPv6 {
  1428  					klog.ErrorS(nil, "--nodeport-addresses includes localhost but localhost NodePorts are not supported on IPv6", "address", ip.String())
  1429  					continue
  1430  				} else if !proxier.localhostNodePorts {
  1431  					klog.ErrorS(nil, "--nodeport-addresses includes localhost but --iptables-localhost-nodeports=false was passed", "address", ip.String())
  1432  					continue
  1433  				}
  1434  			}
  1435  
  1436  			// create nodeport rules for each IP one by one
  1437  			proxier.natRules.Write(
  1438  				"-A", string(kubeServicesChain),
  1439  				"-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
  1440  				"-d", ip.String(),
  1441  				"-j", string(kubeNodePortsChain))
  1442  		}
  1443  	}
  1444  
  1445  	// Drop the packets in INVALID state, which would potentially cause
  1446  	// unexpected connection reset if nf_conntrack_tcp_be_liberal is not set.
  1447  	// Ref: https://github.com/kubernetes/kubernetes/issues/74839
  1448  	// Ref: https://github.com/kubernetes/kubernetes/issues/117924
  1449  	if !proxier.conntrackTCPLiberal {
  1450  		proxier.filterRules.Write(
  1451  			"-A", string(kubeForwardChain),
  1452  			"-m", "conntrack",
  1453  			"--ctstate", "INVALID",
  1454  			"-j", "DROP",
  1455  		)
  1456  	}
  1457  
  1458  	// If the masqueradeMark has been added then we want to forward that same
  1459  	// traffic, this allows NodePort traffic to be forwarded even if the default
  1460  	// FORWARD policy is not accept.
  1461  	proxier.filterRules.Write(
  1462  		"-A", string(kubeForwardChain),
  1463  		"-m", "comment", "--comment", `"kubernetes forwarding rules"`,
  1464  		"-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
  1465  		"-j", "ACCEPT",
  1466  	)
  1467  
  1468  	// The following rule ensures the traffic after the initial packet accepted
  1469  	// by the "kubernetes forwarding rules" rule above will be accepted.
  1470  	proxier.filterRules.Write(
  1471  		"-A", string(kubeForwardChain),
  1472  		"-m", "comment", "--comment", `"kubernetes forwarding conntrack rule"`,
  1473  		"-m", "conntrack",
  1474  		"--ctstate", "RELATED,ESTABLISHED",
  1475  		"-j", "ACCEPT",
  1476  	)
  1477  
  1478  	metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines()))
  1479  	metrics.IptablesRulesLastSync.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines()))
  1480  	metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() + skippedNatRules.Lines() - deletedChains))
  1481  	metrics.IptablesRulesLastSync.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() - deletedChains))
  1482  
  1483  	// Sync rules.
  1484  	proxier.iptablesData.Reset()
  1485  	proxier.iptablesData.WriteString("*filter\n")
  1486  	proxier.iptablesData.Write(proxier.filterChains.Bytes())
  1487  	proxier.iptablesData.Write(proxier.filterRules.Bytes())
  1488  	proxier.iptablesData.WriteString("COMMIT\n")
  1489  	proxier.iptablesData.WriteString("*nat\n")
  1490  	proxier.iptablesData.Write(proxier.natChains.Bytes())
  1491  	proxier.iptablesData.Write(proxier.natRules.Bytes())
  1492  	proxier.iptablesData.WriteString("COMMIT\n")
  1493  
  1494  	klog.V(2).InfoS("Reloading service iptables data",
  1495  		"numServices", len(proxier.svcPortMap),
  1496  		"numEndpoints", totalEndpoints,
  1497  		"numFilterChains", proxier.filterChains.Lines(),
  1498  		"numFilterRules", proxier.filterRules.Lines(),
  1499  		"numNATChains", proxier.natChains.Lines(),
  1500  		"numNATRules", proxier.natRules.Lines(),
  1501  	)
  1502  	klog.V(9).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
  1503  
  1504  	// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table
  1505  	err := proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
  1506  	if err != nil {
  1507  		if pErr, ok := err.(utiliptables.ParseError); ok {
  1508  			lines := utiliptables.ExtractLines(proxier.iptablesData.Bytes(), pErr.Line(), 3)
  1509  			klog.ErrorS(pErr, "Failed to execute iptables-restore", "rules", lines)
  1510  		} else {
  1511  			klog.ErrorS(err, "Failed to execute iptables-restore")
  1512  		}
  1513  		metrics.IptablesRestoreFailuresTotal.Inc()
  1514  		return
  1515  	}
  1516  	success = true
  1517  	proxier.needFullSync = false
  1518  
  1519  	for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
  1520  		for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
  1521  			latency := metrics.SinceInSeconds(lastChangeTriggerTime)
  1522  			metrics.NetworkProgrammingLatency.Observe(latency)
  1523  			klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
  1524  		}
  1525  	}
  1526  
  1527  	metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal))
  1528  	metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal))
  1529  	if proxier.healthzServer != nil {
  1530  		proxier.healthzServer.Updated(proxier.ipFamily)
  1531  	}
  1532  	metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
  1533  
  1534  	// Update service healthchecks.  The endpoints list might include services that are
  1535  	// not "OnlyLocal", but the services list will not, and the serviceHealthServer
  1536  	// will just drop those endpoints.
  1537  	if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
  1538  		klog.ErrorS(err, "Error syncing healthcheck services")
  1539  	}
  1540  	if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
  1541  		klog.ErrorS(err, "Error syncing healthcheck endpoints")
  1542  	}
  1543  
  1544  	// Finish housekeeping, clear stale conntrack entries for UDP Services
  1545  	conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
  1546  }
  1547  
  1548  func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
  1549  	// First write session affinity rules, if applicable.
  1550  	if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1551  		for _, ep := range endpoints {
  1552  			epInfo, ok := ep.(*endpointInfo)
  1553  			if !ok {
  1554  				continue
  1555  			}
  1556  			comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String())
  1557  
  1558  			args = append(args[:0],
  1559  				"-A", string(svcChain),
  1560  			)
  1561  			args = proxier.appendServiceCommentLocked(args, comment)
  1562  			args = append(args,
  1563  				"-m", "recent", "--name", string(epInfo.ChainName),
  1564  				"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
  1565  				"-j", string(epInfo.ChainName),
  1566  			)
  1567  			natRules.Write(args)
  1568  		}
  1569  	}
  1570  
  1571  	// Now write loadbalancing rules.
  1572  	numEndpoints := len(endpoints)
  1573  	for i, ep := range endpoints {
  1574  		epInfo, ok := ep.(*endpointInfo)
  1575  		if !ok {
  1576  			continue
  1577  		}
  1578  		comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String())
  1579  
  1580  		args = append(args[:0], "-A", string(svcChain))
  1581  		args = proxier.appendServiceCommentLocked(args, comment)
  1582  		if i < (numEndpoints - 1) {
  1583  			// Each rule is a probabilistic match.
  1584  			args = append(args,
  1585  				"-m", "statistic",
  1586  				"--mode", "random",
  1587  				"--probability", proxier.probability(numEndpoints-i))
  1588  		}
  1589  		// The final (or only if n == 1) rule is a guaranteed match.
  1590  		natRules.Write(args, "-j", string(epInfo.ChainName))
  1591  	}
  1592  }
  1593  

View as plain text