...

Source file src/k8s.io/kubernetes/pkg/proxy/nftables/proxier.go

Documentation: k8s.io/kubernetes/pkg/proxy/nftables

     1  //go:build linux
     2  // +build linux
     3  
     4  /*
     5  Copyright 2015 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package nftables
    21  
    22  //
    23  // NOTE: this needs to be tested in e2e since it uses nftables for everything.
    24  //
    25  
    26  import (
    27  	"context"
    28  	"crypto/sha256"
    29  	"encoding/base32"
    30  	"fmt"
    31  	"net"
    32  	"reflect"
    33  	"strconv"
    34  	"strings"
    35  	"sync"
    36  	"sync/atomic"
    37  	"time"
    38  
    39  	v1 "k8s.io/api/core/v1"
    40  	discovery "k8s.io/api/discovery/v1"
    41  	"k8s.io/apimachinery/pkg/types"
    42  	"k8s.io/apimachinery/pkg/util/sets"
    43  	"k8s.io/apimachinery/pkg/util/wait"
    44  	"k8s.io/client-go/tools/events"
    45  	utilsysctl "k8s.io/component-helpers/node/util/sysctl"
    46  	"k8s.io/klog/v2"
    47  	"k8s.io/kubernetes/pkg/proxy"
    48  	"k8s.io/kubernetes/pkg/proxy/conntrack"
    49  	"k8s.io/kubernetes/pkg/proxy/healthcheck"
    50  	"k8s.io/kubernetes/pkg/proxy/metaproxier"
    51  	"k8s.io/kubernetes/pkg/proxy/metrics"
    52  	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
    53  	proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
    54  	"k8s.io/kubernetes/pkg/util/async"
    55  	utilexec "k8s.io/utils/exec"
    56  	netutils "k8s.io/utils/net"
    57  	"k8s.io/utils/ptr"
    58  	"sigs.k8s.io/knftables"
    59  )
    60  
    61  const (
    62  	// Our nftables table. All of our chains/sets/maps are created inside this table,
    63  	// so they don't need any "kube-" or "kube-proxy-" prefix of their own.
    64  	kubeProxyTable = "kube-proxy"
    65  
    66  	// base chains
    67  	filterPreroutingChain     = "filter-prerouting"
    68  	filterInputChain          = "filter-input"
    69  	filterForwardChain        = "filter-forward"
    70  	filterOutputChain         = "filter-output"
    71  	filterOutputPostDNATChain = "filter-output-post-dnat"
    72  	natPreroutingChain        = "nat-prerouting"
    73  	natOutputChain            = "nat-output"
    74  	natPostroutingChain       = "nat-postrouting"
    75  
    76  	// service dispatch
    77  	servicesChain       = "services"
    78  	serviceIPsMap       = "service-ips"
    79  	serviceNodePortsMap = "service-nodeports"
    80  
    81  	// set of IPs that accept NodePort traffic
    82  	nodePortIPsSet = "nodeport-ips"
    83  
    84  	// set of active ClusterIPs.
    85  	clusterIPsSet = "cluster-ips"
    86  
    87  	// handling for services with no endpoints
    88  	serviceEndpointsCheckChain  = "service-endpoints-check"
    89  	nodePortEndpointsCheckChain = "nodeport-endpoints-check"
    90  	noEndpointServicesMap       = "no-endpoint-services"
    91  	noEndpointNodePortsMap      = "no-endpoint-nodeports"
    92  	rejectChain                 = "reject-chain"
    93  
    94  	// handling traffic to unallocated ClusterIPs and undefined ports of ClusterIPs
    95  	clusterIPsCheckChain = "cluster-ips-check"
    96  
    97  	// LoadBalancerSourceRanges handling
    98  	firewallIPsMap     = "firewall-ips"
    99  	firewallCheckChain = "firewall-check"
   100  
   101  	// masquerading
   102  	markMasqChain     = "mark-for-masquerade"
   103  	masqueradingChain = "masquerading"
   104  )
   105  
   106  // NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies.
   107  func NewDualStackProxier(
   108  	sysctl utilsysctl.Interface,
   109  	syncPeriod time.Duration,
   110  	minSyncPeriod time.Duration,
   111  	masqueradeAll bool,
   112  	masqueradeBit int,
   113  	localDetectors [2]proxyutiliptables.LocalTrafficDetector,
   114  	hostname string,
   115  	nodeIPs map[v1.IPFamily]net.IP,
   116  	recorder events.EventRecorder,
   117  	healthzServer *healthcheck.ProxierHealthServer,
   118  	nodePortAddresses []string,
   119  	initOnly bool,
   120  ) (proxy.Provider, error) {
   121  	// Create an ipv4 instance of the single-stack proxier
   122  	ipv4Proxier, err := NewProxier(v1.IPv4Protocol, sysctl,
   123  		syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[0], hostname,
   124  		nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
   125  	if err != nil {
   126  		return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
   127  	}
   128  
   129  	ipv6Proxier, err := NewProxier(v1.IPv6Protocol, sysctl,
   130  		syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[1], hostname,
   131  		nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
   132  	if err != nil {
   133  		return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
   134  	}
   135  	if initOnly {
   136  		return nil, nil
   137  	}
   138  	return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
   139  }
   140  
   141  // Proxier is an nftables based proxy
   142  type Proxier struct {
   143  	// ipFamily defines the IP family which this proxier is tracking.
   144  	ipFamily v1.IPFamily
   145  
   146  	// endpointsChanges and serviceChanges contains all changes to endpoints and
   147  	// services that happened since nftables was synced. For a single object,
   148  	// changes are accumulated, i.e. previous is state from before all of them,
   149  	// current is state after applying all of those.
   150  	endpointsChanges *proxy.EndpointsChangeTracker
   151  	serviceChanges   *proxy.ServiceChangeTracker
   152  
   153  	mu           sync.Mutex // protects the following fields
   154  	svcPortMap   proxy.ServicePortMap
   155  	endpointsMap proxy.EndpointsMap
   156  	nodeLabels   map[string]string
   157  	// endpointSlicesSynced, and servicesSynced are set to true
   158  	// when corresponding objects are synced after startup. This is used to avoid
   159  	// updating nftables with some partial data after kube-proxy restart.
   160  	endpointSlicesSynced bool
   161  	servicesSynced       bool
   162  	initialized          int32
   163  	syncRunner           *async.BoundedFrequencyRunner // governs calls to syncProxyRules
   164  	syncPeriod           time.Duration
   165  	flushed              bool
   166  
   167  	// These are effectively const and do not need the mutex to be held.
   168  	nftables       knftables.Interface
   169  	masqueradeAll  bool
   170  	masqueradeMark string
   171  	conntrack      conntrack.Interface
   172  	localDetector  proxyutiliptables.LocalTrafficDetector
   173  	hostname       string
   174  	nodeIP         net.IP
   175  	recorder       events.EventRecorder
   176  
   177  	serviceHealthServer healthcheck.ServiceHealthServer
   178  	healthzServer       *healthcheck.ProxierHealthServer
   179  
   180  	// nodePortAddresses selects the interfaces where nodePort works.
   181  	nodePortAddresses *proxyutil.NodePortAddresses
   182  	// networkInterfacer defines an interface for several net library functions.
   183  	// Inject for test purpose.
   184  	networkInterfacer proxyutil.NetworkInterfacer
   185  
   186  	// staleChains contains information about chains to be deleted later
   187  	staleChains map[string]time.Time
   188  
   189  	// serviceCIDRs is a comma separated list of ServiceCIDRs belonging to the IPFamily
   190  	// which proxier is operating on, can be directly consumed by knftables.
   191  	serviceCIDRs string
   192  }
   193  
   194  // Proxier implements proxy.Provider
   195  var _ proxy.Provider = &Proxier{}
   196  
   197  // NewProxier returns a new nftables Proxier. Once a proxier is created, it will keep
   198  // nftables up to date in the background and will not terminate if a particular nftables
   199  // call fails.
   200  func NewProxier(ipFamily v1.IPFamily,
   201  	sysctl utilsysctl.Interface,
   202  	syncPeriod time.Duration,
   203  	minSyncPeriod time.Duration,
   204  	masqueradeAll bool,
   205  	masqueradeBit int,
   206  	localDetector proxyutiliptables.LocalTrafficDetector,
   207  	hostname string,
   208  	nodeIP net.IP,
   209  	recorder events.EventRecorder,
   210  	healthzServer *healthcheck.ProxierHealthServer,
   211  	nodePortAddressStrings []string,
   212  	initOnly bool,
   213  ) (*Proxier, error) {
   214  	nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings, nodeIP)
   215  
   216  	if initOnly {
   217  		klog.InfoS("System initialized and --init-only specified")
   218  		return nil, nil
   219  	}
   220  
   221  	// Generate the masquerade mark to use for SNAT rules.
   222  	masqueradeValue := 1 << uint(masqueradeBit)
   223  	masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
   224  	klog.V(2).InfoS("Using nftables mark for masquerade", "ipFamily", ipFamily, "mark", masqueradeMark)
   225  
   226  	serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
   227  
   228  	var nftablesFamily knftables.Family
   229  	if ipFamily == v1.IPv4Protocol {
   230  		nftablesFamily = knftables.IPv4Family
   231  	} else {
   232  		nftablesFamily = knftables.IPv6Family
   233  	}
   234  	nft, err := knftables.New(nftablesFamily, kubeProxyTable)
   235  	if err != nil {
   236  		return nil, err
   237  	}
   238  
   239  	proxier := &Proxier{
   240  		ipFamily:            ipFamily,
   241  		svcPortMap:          make(proxy.ServicePortMap),
   242  		serviceChanges:      proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
   243  		endpointsMap:        make(proxy.EndpointsMap),
   244  		endpointsChanges:    proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
   245  		syncPeriod:          syncPeriod,
   246  		nftables:            nft,
   247  		masqueradeAll:       masqueradeAll,
   248  		masqueradeMark:      masqueradeMark,
   249  		conntrack:           conntrack.NewExec(utilexec.New()),
   250  		localDetector:       localDetector,
   251  		hostname:            hostname,
   252  		nodeIP:              nodeIP,
   253  		recorder:            recorder,
   254  		serviceHealthServer: serviceHealthServer,
   255  		healthzServer:       healthzServer,
   256  		nodePortAddresses:   nodePortAddresses,
   257  		networkInterfacer:   proxyutil.RealNetwork{},
   258  		staleChains:         make(map[string]time.Time),
   259  	}
   260  
   261  	burstSyncs := 2
   262  	klog.V(2).InfoS("NFTables sync params", "ipFamily", ipFamily, "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
   263  	proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
   264  
   265  	return proxier, nil
   266  }
   267  
   268  // internal struct for string service information
   269  type servicePortInfo struct {
   270  	*proxy.BaseServicePortInfo
   271  	// The following fields are computed and stored for performance reasons.
   272  	nameString             string
   273  	clusterPolicyChainName string
   274  	localPolicyChainName   string
   275  	externalChainName      string
   276  	firewallChainName      string
   277  }
   278  
   279  // returns a new proxy.ServicePort which abstracts a serviceInfo
   280  func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
   281  	svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo}
   282  
   283  	// Store the following for performance reasons.
   284  	svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
   285  	svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
   286  	svcPort.nameString = svcPortName.String()
   287  
   288  	chainNameBase := servicePortChainNameBase(&svcPortName, strings.ToLower(string(svcPort.Protocol())))
   289  	svcPort.clusterPolicyChainName = servicePortPolicyClusterChainNamePrefix + chainNameBase
   290  	svcPort.localPolicyChainName = servicePortPolicyLocalChainNamePrefix + chainNameBase
   291  	svcPort.externalChainName = serviceExternalChainNamePrefix + chainNameBase
   292  	svcPort.firewallChainName = servicePortFirewallChainNamePrefix + chainNameBase
   293  
   294  	return svcPort
   295  }
   296  
   297  // internal struct for endpoints information
   298  type endpointInfo struct {
   299  	*proxy.BaseEndpointInfo
   300  
   301  	chainName       string
   302  	affinitySetName string
   303  }
   304  
   305  // returns a new proxy.Endpoint which abstracts a endpointInfo
   306  func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
   307  	chainNameBase := servicePortEndpointChainNameBase(svcPortName, strings.ToLower(string(svcPortName.Protocol)), baseInfo.String())
   308  	return &endpointInfo{
   309  		BaseEndpointInfo: baseInfo,
   310  		chainName:        servicePortEndpointChainNamePrefix + chainNameBase,
   311  		affinitySetName:  servicePortEndpointAffinityNamePrefix + chainNameBase,
   312  	}
   313  }
   314  
   315  // nftablesBaseChains lists our "base chains"; those that are directly connected to the
   316  // netfilter hooks (e.g., "postrouting", "input", etc.), as opposed to "regular" chains,
   317  // which are only run when a rule jumps to them. See
   318  // https://wiki.nftables.org/wiki-nftables/index.php/Configuring_chains.
   319  //
   320  // These are set up from setupNFTables() and then not directly referenced by
   321  // syncProxyRules().
   322  //
   323  // All of our base chains have names that are just "${type}-${hook}". e.g., "nat-prerouting".
   324  type nftablesBaseChain struct {
   325  	name      string
   326  	chainType knftables.BaseChainType
   327  	hook      knftables.BaseChainHook
   328  	priority  knftables.BaseChainPriority
   329  }
   330  
   331  var nftablesBaseChains = []nftablesBaseChain{
   332  	// We want our filtering rules to operate on pre-DNAT dest IPs, so our filter
   333  	// chains have to run before DNAT.
   334  	{filterPreroutingChain, knftables.FilterType, knftables.PreroutingHook, knftables.DNATPriority + "-10"},
   335  	{filterInputChain, knftables.FilterType, knftables.InputHook, knftables.DNATPriority + "-10"},
   336  	{filterForwardChain, knftables.FilterType, knftables.ForwardHook, knftables.DNATPriority + "-10"},
   337  	{filterOutputChain, knftables.FilterType, knftables.OutputHook, knftables.DNATPriority + "-10"},
   338  	{filterOutputPostDNATChain, knftables.FilterType, knftables.OutputHook, knftables.DNATPriority + "+10"},
   339  	{natPreroutingChain, knftables.NATType, knftables.PreroutingHook, knftables.DNATPriority},
   340  	{natOutputChain, knftables.NATType, knftables.OutputHook, knftables.DNATPriority},
   341  	{natPostroutingChain, knftables.NATType, knftables.PostroutingHook, knftables.SNATPriority},
   342  }
   343  
   344  // nftablesJumpChains lists our top-level "regular chains" that are jumped to directly
   345  // from one of the base chains. These are set up from setupNFTables(), and some of them
   346  // are also referenced in syncProxyRules().
   347  type nftablesJumpChain struct {
   348  	dstChain  string
   349  	srcChain  string
   350  	extraArgs string
   351  }
   352  
   353  var nftablesJumpChains = []nftablesJumpChain{
   354  	// We can't jump to endpointsCheckChain from filter-prerouting like
   355  	// firewallCheckChain because reject action is only valid in chains using the
   356  	// input, forward or output hooks with kernels before 5.9.
   357  	{nodePortEndpointsCheckChain, filterInputChain, "ct state new"},
   358  	{serviceEndpointsCheckChain, filterInputChain, "ct state new"},
   359  	{serviceEndpointsCheckChain, filterForwardChain, "ct state new"},
   360  	{serviceEndpointsCheckChain, filterOutputChain, "ct state new"},
   361  
   362  	{firewallCheckChain, filterPreroutingChain, "ct state new"},
   363  	{firewallCheckChain, filterOutputChain, "ct state new"},
   364  
   365  	{servicesChain, natOutputChain, ""},
   366  	{servicesChain, natPreroutingChain, ""},
   367  	{masqueradingChain, natPostroutingChain, ""},
   368  
   369  	{clusterIPsCheckChain, filterForwardChain, "ct state new"},
   370  	{clusterIPsCheckChain, filterOutputPostDNATChain, "ct state new"},
   371  }
   372  
   373  // ensureChain adds commands to tx to ensure that chain exists and doesn't contain
   374  // anything from before this transaction (using createdChains to ensure that we don't
   375  // Flush a chain more than once and lose *new* rules as well.)
   376  func ensureChain(chain string, tx *knftables.Transaction, createdChains sets.Set[string]) {
   377  	if createdChains.Has(chain) {
   378  		return
   379  	}
   380  	tx.Add(&knftables.Chain{
   381  		Name: chain,
   382  	})
   383  	tx.Flush(&knftables.Chain{
   384  		Name: chain,
   385  	})
   386  	createdChains.Insert(chain)
   387  }
   388  
   389  func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
   390  	ipX := "ip"
   391  	ipvX_addr := "ipv4_addr" //nolint:stylecheck // var name intentionally resembles value
   392  	noLocalhost := "ip daddr != 127.0.0.0/8"
   393  	if proxier.ipFamily == v1.IPv6Protocol {
   394  		ipX = "ip6"
   395  		ipvX_addr = "ipv6_addr"
   396  		noLocalhost = "ip6 daddr != ::1"
   397  	}
   398  
   399  	tx.Add(&knftables.Table{
   400  		Comment: ptr.To("rules for kube-proxy"),
   401  	})
   402  
   403  	// Do an extra "add+delete" once to ensure all previous base chains in the table
   404  	// will be recreated. Otherwise, altering properties (e.g. priority) of these
   405  	// chains would fail the transaction.
   406  	if !proxier.flushed {
   407  		for _, bc := range nftablesBaseChains {
   408  			chain := &knftables.Chain{
   409  				Name: bc.name,
   410  			}
   411  			tx.Add(chain)
   412  			tx.Delete(chain)
   413  		}
   414  		proxier.flushed = true
   415  	}
   416  
   417  	// Create and flush base chains
   418  	for _, bc := range nftablesBaseChains {
   419  		chain := &knftables.Chain{
   420  			Name:     bc.name,
   421  			Type:     ptr.To(bc.chainType),
   422  			Hook:     ptr.To(bc.hook),
   423  			Priority: ptr.To(bc.priority),
   424  		}
   425  		tx.Add(chain)
   426  		tx.Flush(chain)
   427  	}
   428  
   429  	// Create and flush ordinary chains and add rules jumping to them
   430  	createdChains := sets.New[string]()
   431  	for _, c := range nftablesJumpChains {
   432  		ensureChain(c.dstChain, tx, createdChains)
   433  		tx.Add(&knftables.Rule{
   434  			Chain: c.srcChain,
   435  			Rule: knftables.Concat(
   436  				c.extraArgs,
   437  				"jump", c.dstChain,
   438  			),
   439  		})
   440  	}
   441  
   442  	// Ensure all of our other "top-level" chains exist
   443  	for _, chain := range []string{servicesChain, clusterIPsCheckChain, masqueradingChain, markMasqChain} {
   444  		ensureChain(chain, tx, createdChains)
   445  	}
   446  
   447  	// Add the rules in the mark-for-masquerade and masquerading chains
   448  	tx.Add(&knftables.Rule{
   449  		Chain: markMasqChain,
   450  		Rule: knftables.Concat(
   451  			"mark", "set", "mark", "or", proxier.masqueradeMark,
   452  		),
   453  	})
   454  
   455  	tx.Add(&knftables.Rule{
   456  		Chain: masqueradingChain,
   457  		Rule: knftables.Concat(
   458  			"mark", "and", proxier.masqueradeMark, "==", "0",
   459  			"return",
   460  		),
   461  	})
   462  	tx.Add(&knftables.Rule{
   463  		Chain: masqueradingChain,
   464  		Rule: knftables.Concat(
   465  			"mark", "set", "mark", "xor", proxier.masqueradeMark,
   466  		),
   467  	})
   468  	tx.Add(&knftables.Rule{
   469  		Chain: masqueradingChain,
   470  		Rule:  "masquerade fully-random",
   471  	})
   472  
   473  	// add cluster-ips set.
   474  	tx.Add(&knftables.Set{
   475  		Name:    clusterIPsSet,
   476  		Type:    ipvX_addr,
   477  		Comment: ptr.To("Active ClusterIPs"),
   478  	})
   479  
   480  	// reject traffic to invalid ports of ClusterIPs.
   481  	tx.Add(&knftables.Rule{
   482  		Chain: clusterIPsCheckChain,
   483  		Rule: knftables.Concat(
   484  			ipX, "daddr", "@", clusterIPsSet, "reject",
   485  		),
   486  		Comment: ptr.To("Reject traffic to invalid ports of ClusterIPs"),
   487  	})
   488  
   489  	// drop traffic to unallocated ClusterIPs.
   490  	if len(proxier.serviceCIDRs) > 0 {
   491  		tx.Add(&knftables.Rule{
   492  			Chain: clusterIPsCheckChain,
   493  			Rule: knftables.Concat(
   494  				ipX, "daddr", "{", proxier.serviceCIDRs, "}",
   495  				"drop",
   496  			),
   497  			Comment: ptr.To("Drop traffic to unallocated ClusterIPs"),
   498  		})
   499  	}
   500  
   501  	// Fill in nodeport-ips set if needed (or delete it if not). (We do "add+delete"
   502  	// rather than just "delete" when we want to ensure the set doesn't exist, because
   503  	// doing just "delete" would return an error if the set didn't exist.)
   504  	tx.Add(&knftables.Set{
   505  		Name:    nodePortIPsSet,
   506  		Type:    ipvX_addr,
   507  		Comment: ptr.To("IPs that accept NodePort traffic"),
   508  	})
   509  	if proxier.nodePortAddresses.MatchAll() {
   510  		tx.Delete(&knftables.Set{
   511  			Name: nodePortIPsSet,
   512  		})
   513  	} else {
   514  		tx.Flush(&knftables.Set{
   515  			Name: nodePortIPsSet,
   516  		})
   517  		nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
   518  		if err != nil {
   519  			klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
   520  		}
   521  		for _, ip := range nodeIPs {
   522  			if ip.IsLoopback() {
   523  				klog.ErrorS(nil, "--nodeport-addresses includes localhost but localhost NodePorts are not supported", "address", ip.String())
   524  				continue
   525  			}
   526  			tx.Add(&knftables.Element{
   527  				Set: nodePortIPsSet,
   528  				Key: []string{
   529  					ip.String(),
   530  				},
   531  			})
   532  		}
   533  	}
   534  
   535  	// Set up "no endpoints" drop/reject handling
   536  	tx.Add(&knftables.Map{
   537  		Name:    noEndpointServicesMap,
   538  		Type:    ipvX_addr + " . inet_proto . inet_service : verdict",
   539  		Comment: ptr.To("vmap to drop or reject packets to services with no endpoints"),
   540  	})
   541  	tx.Add(&knftables.Map{
   542  		Name:    noEndpointNodePortsMap,
   543  		Type:    "inet_proto . inet_service : verdict",
   544  		Comment: ptr.To("vmap to drop or reject packets to service nodeports with no endpoints"),
   545  	})
   546  
   547  	tx.Add(&knftables.Chain{
   548  		Name:    rejectChain,
   549  		Comment: ptr.To("helper for @no-endpoint-services / @no-endpoint-nodeports"),
   550  	})
   551  	tx.Flush(&knftables.Chain{
   552  		Name: rejectChain,
   553  	})
   554  	tx.Add(&knftables.Rule{
   555  		Chain: rejectChain,
   556  		Rule:  "reject",
   557  	})
   558  
   559  	tx.Add(&knftables.Rule{
   560  		Chain: serviceEndpointsCheckChain,
   561  		Rule: knftables.Concat(
   562  			ipX, "daddr", ".", "meta l4proto", ".", "th dport",
   563  			"vmap", "@", noEndpointServicesMap,
   564  		),
   565  	})
   566  
   567  	if proxier.nodePortAddresses.MatchAll() {
   568  		tx.Add(&knftables.Rule{
   569  			Chain: nodePortEndpointsCheckChain,
   570  			Rule: knftables.Concat(
   571  				noLocalhost,
   572  				"meta l4proto . th dport",
   573  				"vmap", "@", noEndpointNodePortsMap,
   574  			),
   575  		})
   576  	} else {
   577  		tx.Add(&knftables.Rule{
   578  			Chain: nodePortEndpointsCheckChain,
   579  			Rule: knftables.Concat(
   580  				ipX, "daddr", "@", nodePortIPsSet,
   581  				"meta l4proto . th dport",
   582  				"vmap", "@", noEndpointNodePortsMap,
   583  			),
   584  		})
   585  	}
   586  
   587  	// Set up LoadBalancerSourceRanges firewalling
   588  	tx.Add(&knftables.Map{
   589  		Name:    firewallIPsMap,
   590  		Type:    ipvX_addr + " . inet_proto . inet_service : verdict",
   591  		Comment: ptr.To("destinations that are subject to LoadBalancerSourceRanges"),
   592  	})
   593  
   594  	ensureChain(firewallCheckChain, tx, createdChains)
   595  	tx.Add(&knftables.Rule{
   596  		Chain: firewallCheckChain,
   597  		Rule: knftables.Concat(
   598  			ipX, "daddr", ".", "meta l4proto", ".", "th dport",
   599  			"vmap", "@", firewallIPsMap,
   600  		),
   601  	})
   602  
   603  	// Set up service dispatch
   604  	tx.Add(&knftables.Map{
   605  		Name:    serviceIPsMap,
   606  		Type:    ipvX_addr + " . inet_proto . inet_service : verdict",
   607  		Comment: ptr.To("ClusterIP, ExternalIP and LoadBalancer IP traffic"),
   608  	})
   609  	tx.Add(&knftables.Map{
   610  		Name:    serviceNodePortsMap,
   611  		Type:    "inet_proto . inet_service : verdict",
   612  		Comment: ptr.To("NodePort traffic"),
   613  	})
   614  	tx.Add(&knftables.Rule{
   615  		Chain: servicesChain,
   616  		Rule: knftables.Concat(
   617  			ipX, "daddr", ".", "meta l4proto", ".", "th dport",
   618  			"vmap", "@", serviceIPsMap,
   619  		),
   620  	})
   621  	if proxier.nodePortAddresses.MatchAll() {
   622  		tx.Add(&knftables.Rule{
   623  			Chain: servicesChain,
   624  			Rule: knftables.Concat(
   625  				"fib daddr type local",
   626  				noLocalhost,
   627  				"meta l4proto . th dport",
   628  				"vmap", "@", serviceNodePortsMap,
   629  			),
   630  		})
   631  	} else {
   632  		tx.Add(&knftables.Rule{
   633  			Chain: servicesChain,
   634  			Rule: knftables.Concat(
   635  				ipX, "daddr @nodeport-ips",
   636  				"meta l4proto . th dport",
   637  				"vmap", "@", serviceNodePortsMap,
   638  			),
   639  		})
   640  	}
   641  }
   642  
   643  // CleanupLeftovers removes all nftables rules and chains created by the Proxier
   644  // It returns true if an error was encountered. Errors are logged.
   645  func CleanupLeftovers() bool {
   646  	var encounteredError bool
   647  
   648  	for _, family := range []knftables.Family{knftables.IPv4Family, knftables.IPv6Family} {
   649  		nft, err := knftables.New(family, kubeProxyTable)
   650  		if err == nil {
   651  			tx := nft.NewTransaction()
   652  			tx.Delete(&knftables.Table{})
   653  			err = nft.Run(context.TODO(), tx)
   654  		}
   655  		if err != nil && !knftables.IsNotFound(err) {
   656  			klog.ErrorS(err, "Error cleaning up nftables rules")
   657  			encounteredError = true
   658  		}
   659  	}
   660  
   661  	return encounteredError
   662  }
   663  
   664  // Sync is called to synchronize the proxier state to nftables as soon as possible.
   665  func (proxier *Proxier) Sync() {
   666  	if proxier.healthzServer != nil {
   667  		proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
   668  	}
   669  	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
   670  	proxier.syncRunner.Run()
   671  }
   672  
   673  // SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
   674  func (proxier *Proxier) SyncLoop() {
   675  	// Update healthz timestamp at beginning in case Sync() never succeeds.
   676  	if proxier.healthzServer != nil {
   677  		proxier.healthzServer.Updated(proxier.ipFamily)
   678  	}
   679  
   680  	// synthesize "last change queued" time as the informers are syncing.
   681  	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
   682  	proxier.syncRunner.Loop(wait.NeverStop)
   683  }
   684  
   685  func (proxier *Proxier) setInitialized(value bool) {
   686  	var initialized int32
   687  	if value {
   688  		initialized = 1
   689  	}
   690  	atomic.StoreInt32(&proxier.initialized, initialized)
   691  }
   692  
   693  func (proxier *Proxier) isInitialized() bool {
   694  	return atomic.LoadInt32(&proxier.initialized) > 0
   695  }
   696  
   697  // OnServiceAdd is called whenever creation of new service object
   698  // is observed.
   699  func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
   700  	proxier.OnServiceUpdate(nil, service)
   701  }
   702  
   703  // OnServiceUpdate is called whenever modification of an existing
   704  // service object is observed.
   705  func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
   706  	if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
   707  		proxier.Sync()
   708  	}
   709  }
   710  
   711  // OnServiceDelete is called whenever deletion of an existing service
   712  // object is observed.
   713  func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
   714  	proxier.OnServiceUpdate(service, nil)
   715  
   716  }
   717  
   718  // OnServiceSynced is called once all the initial event handlers were
   719  // called and the state is fully propagated to local cache.
   720  func (proxier *Proxier) OnServiceSynced() {
   721  	proxier.mu.Lock()
   722  	proxier.servicesSynced = true
   723  	proxier.setInitialized(proxier.endpointSlicesSynced)
   724  	proxier.mu.Unlock()
   725  
   726  	// Sync unconditionally - this is called once per lifetime.
   727  	proxier.syncProxyRules()
   728  }
   729  
   730  // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
   731  // is observed.
   732  func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
   733  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
   734  		proxier.Sync()
   735  	}
   736  }
   737  
   738  // OnEndpointSliceUpdate is called whenever modification of an existing endpoint
   739  // slice object is observed.
   740  func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
   741  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
   742  		proxier.Sync()
   743  	}
   744  }
   745  
   746  // OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
   747  // object is observed.
   748  func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
   749  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
   750  		proxier.Sync()
   751  	}
   752  }
   753  
   754  // OnEndpointSlicesSynced is called once all the initial event handlers were
   755  // called and the state is fully propagated to local cache.
   756  func (proxier *Proxier) OnEndpointSlicesSynced() {
   757  	proxier.mu.Lock()
   758  	proxier.endpointSlicesSynced = true
   759  	proxier.setInitialized(proxier.servicesSynced)
   760  	proxier.mu.Unlock()
   761  
   762  	// Sync unconditionally - this is called once per lifetime.
   763  	proxier.syncProxyRules()
   764  }
   765  
   766  // OnNodeAdd is called whenever creation of new node object
   767  // is observed.
   768  func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
   769  	if node.Name != proxier.hostname {
   770  		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
   771  			"eventNode", node.Name, "currentNode", proxier.hostname)
   772  		return
   773  	}
   774  
   775  	if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
   776  		return
   777  	}
   778  
   779  	proxier.mu.Lock()
   780  	proxier.nodeLabels = map[string]string{}
   781  	for k, v := range node.Labels {
   782  		proxier.nodeLabels[k] = v
   783  	}
   784  	proxier.mu.Unlock()
   785  	klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
   786  
   787  	proxier.Sync()
   788  }
   789  
   790  // OnNodeUpdate is called whenever modification of an existing
   791  // node object is observed.
   792  func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
   793  	if node.Name != proxier.hostname {
   794  		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
   795  			"eventNode", node.Name, "currentNode", proxier.hostname)
   796  		return
   797  	}
   798  
   799  	if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
   800  		return
   801  	}
   802  
   803  	proxier.mu.Lock()
   804  	proxier.nodeLabels = map[string]string{}
   805  	for k, v := range node.Labels {
   806  		proxier.nodeLabels[k] = v
   807  	}
   808  	proxier.mu.Unlock()
   809  	klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
   810  
   811  	proxier.Sync()
   812  }
   813  
   814  // OnNodeDelete is called whenever deletion of an existing node
   815  // object is observed.
   816  func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
   817  	if node.Name != proxier.hostname {
   818  		klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
   819  			"eventNode", node.Name, "currentNode", proxier.hostname)
   820  		return
   821  	}
   822  
   823  	proxier.mu.Lock()
   824  	proxier.nodeLabels = nil
   825  	proxier.mu.Unlock()
   826  
   827  	proxier.Sync()
   828  }
   829  
   830  // OnNodeSynced is called once all the initial event handlers were
   831  // called and the state is fully propagated to local cache.
   832  func (proxier *Proxier) OnNodeSynced() {
   833  }
   834  
   835  // OnServiceCIDRsChanged is called whenever a change is observed
   836  // in any of the ServiceCIDRs, and provides complete list of service cidrs.
   837  func (proxier *Proxier) OnServiceCIDRsChanged(cidrs []string) {
   838  	proxier.mu.Lock()
   839  	defer proxier.mu.Unlock()
   840  
   841  	cidrsForProxier := make([]string, 0)
   842  	for _, cidr := range cidrs {
   843  		isIPv4CIDR := netutils.IsIPv4CIDRString(cidr)
   844  		if proxier.ipFamily == v1.IPv4Protocol && isIPv4CIDR {
   845  			cidrsForProxier = append(cidrsForProxier, cidr)
   846  		}
   847  
   848  		if proxier.ipFamily == v1.IPv6Protocol && !isIPv4CIDR {
   849  			cidrsForProxier = append(cidrsForProxier, cidr)
   850  		}
   851  	}
   852  	proxier.serviceCIDRs = strings.Join(cidrsForProxier, ",")
   853  }
   854  
   855  const (
   856  	// Maximum length for one of our chain name prefixes, including the trailing
   857  	// hyphen.
   858  	chainNamePrefixLengthMax = 16
   859  
   860  	// Maximum length of the string returned from servicePortChainNameBase or
   861  	// servicePortEndpointChainNameBase.
   862  	chainNameBaseLengthMax = knftables.NameLengthMax - chainNamePrefixLengthMax
   863  )
   864  
   865  const (
   866  	servicePortPolicyClusterChainNamePrefix = "service-"
   867  	servicePortPolicyLocalChainNamePrefix   = "local-"
   868  	serviceExternalChainNamePrefix          = "external-"
   869  	servicePortEndpointChainNamePrefix      = "endpoint-"
   870  	servicePortEndpointAffinityNamePrefix   = "affinity-"
   871  	servicePortFirewallChainNamePrefix      = "firewall-"
   872  )
   873  
   874  // hashAndTruncate prefixes name with a hash of itself and then truncates to
   875  // chainNameBaseLengthMax. The hash ensures that (a) the name is still unique if we have
   876  // to truncate the end, and (b) it's visually distinguishable from other chains that would
   877  // otherwise have nearly identical names (e.g., different endpoint chains for a given
   878  // service that differ in only a single digit).
   879  func hashAndTruncate(name string) string {
   880  	hash := sha256.Sum256([]byte(name))
   881  	encoded := base32.StdEncoding.EncodeToString(hash[:])
   882  	name = encoded[:8] + "-" + name
   883  	if len(name) > chainNameBaseLengthMax {
   884  		name = name[:chainNameBaseLengthMax-3] + "..."
   885  	}
   886  	return name
   887  }
   888  
   889  // servicePortChainNameBase returns the base name for a chain for the given ServicePort.
   890  // This is something like "HASH-namespace/serviceName/protocol/portName", e.g,
   891  // "ULMVA6XW-ns1/svc1/tcp/p80".
   892  func servicePortChainNameBase(servicePortName *proxy.ServicePortName, protocol string) string {
   893  	// nftables chains can contain the characters [A-Za-z0-9_./-] (but must start with
   894  	// a letter, underscore, or dot).
   895  	//
   896  	// Namespace, Service, and Port names can contain [a-z0-9-] (with some additional
   897  	// restrictions that aren't relevant here).
   898  	//
   899  	// Protocol is /(tcp|udp|sctp)/.
   900  	//
   901  	// Thus, we can safely use all Namespace names, Service names, protocol values,
   902  	// and Port names directly in nftables chain names (though note that this assumes
   903  	// that the chain name won't *start* with any of those strings, since that might
   904  	// be illegal). We use "/" to separate the parts of the name, which is one of the
   905  	// two characters allowed in a chain name that isn't allowed in our input strings.
   906  
   907  	name := fmt.Sprintf("%s/%s/%s/%s",
   908  		servicePortName.NamespacedName.Namespace,
   909  		servicePortName.NamespacedName.Name,
   910  		protocol,
   911  		servicePortName.Port,
   912  	)
   913  
   914  	// The namespace, service, and port name can each be up to 63 characters, protocol
   915  	// can be up to 4, plus 8 for the hash and 4 additional punctuation characters.
   916  	// That's a total of 205, which is less than chainNameBaseLengthMax (240). So this
   917  	// will never actually return a truncated name.
   918  	return hashAndTruncate(name)
   919  }
   920  
   921  // servicePortEndpointChainNameBase returns the suffix for chain names for the given
   922  // endpoint. This is something like
   923  // "HASH-namespace/serviceName/protocol/portName__endpointIP/endpointport", e.g.,
   924  // "5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80".
   925  func servicePortEndpointChainNameBase(servicePortName *proxy.ServicePortName, protocol, endpoint string) string {
   926  	// As above in servicePortChainNameBase: Namespace, Service, Port, Protocol, and
   927  	// EndpointPort are all safe to copy into the chain name directly. But if
   928  	// EndpointIP is IPv6 then it will contain colons, which aren't allowed in a chain
   929  	// name. IPv6 IPs are also quite long, but we can't safely truncate them (e.g. to
   930  	// only the final segment) because (especially for manually-created external
   931  	// endpoints), we can't know for sure that any part of them is redundant.
   932  
   933  	endpointIP, endpointPort, _ := net.SplitHostPort(endpoint)
   934  	if strings.Contains(endpointIP, ":") {
   935  		endpointIP = strings.ReplaceAll(endpointIP, ":", ".")
   936  	}
   937  
   938  	// As above, we use "/" to separate parts of the name, and "__" to separate the
   939  	// "service" part from the "endpoint" part.
   940  	name := fmt.Sprintf("%s/%s/%s/%s__%s/%s",
   941  		servicePortName.NamespacedName.Namespace,
   942  		servicePortName.NamespacedName.Name,
   943  		protocol,
   944  		servicePortName.Port,
   945  		endpointIP,
   946  		endpointPort,
   947  	)
   948  
   949  	// The part of name before the "__" can be up to 205 characters (as with
   950  	// servicePortChainNameBase above). An IPv6 address can be up to 39 characters, and
   951  	// a port can be up to 5 digits, plus 3 punctuation characters gives a max total
   952  	// length of 252, well over chainNameBaseLengthMax (240), so truncation is
   953  	// theoretically possible (though incredibly unlikely).
   954  	return hashAndTruncate(name)
   955  }
   956  
   957  func isServiceChainName(chainString string) bool {
   958  	// The chains returned from servicePortChainNameBase and
   959  	// servicePortEndpointChainNameBase will always have at least one "/" in them.
   960  	// Since none of our "stock" chain names use slashes, we can distinguish them this
   961  	// way.
   962  	return strings.Contains(chainString, "/")
   963  }
   964  
   965  func isAffinitySetName(set string) bool {
   966  	return strings.HasPrefix(set, servicePortEndpointAffinityNamePrefix)
   967  }
   968  
   969  // This is where all of the nftables calls happen.
   970  // This assumes proxier.mu is NOT held
   971  func (proxier *Proxier) syncProxyRules() {
   972  	proxier.mu.Lock()
   973  	defer proxier.mu.Unlock()
   974  
   975  	// don't sync rules till we've received services and endpoints
   976  	if !proxier.isInitialized() {
   977  		klog.V(2).InfoS("Not syncing nftables until Services and Endpoints have been received from master")
   978  		return
   979  	}
   980  
   981  	//
   982  	// Below this point we will not return until we try to write the nftables rules.
   983  	//
   984  
   985  	// Keep track of how long syncs take.
   986  	start := time.Now()
   987  	defer func() {
   988  		metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
   989  		klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
   990  	}()
   991  
   992  	serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
   993  	endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
   994  
   995  	klog.V(2).InfoS("Syncing nftables rules")
   996  
   997  	success := false
   998  	defer func() {
   999  		if !success {
  1000  			klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
  1001  			proxier.syncRunner.RetryAfter(proxier.syncPeriod)
  1002  		}
  1003  	}()
  1004  
  1005  	// If there are sufficiently-stale chains left over from previous transactions,
  1006  	// try to delete them now.
  1007  	if len(proxier.staleChains) > 0 {
  1008  		oneSecondAgo := start.Add(-time.Second)
  1009  		tx := proxier.nftables.NewTransaction()
  1010  		deleted := 0
  1011  		for chain, modtime := range proxier.staleChains {
  1012  			if modtime.Before(oneSecondAgo) {
  1013  				tx.Delete(&knftables.Chain{
  1014  					Name: chain,
  1015  				})
  1016  				delete(proxier.staleChains, chain)
  1017  				deleted++
  1018  			}
  1019  		}
  1020  		if deleted > 0 {
  1021  			klog.InfoS("Deleting stale nftables chains", "numChains", deleted)
  1022  			err := proxier.nftables.Run(context.TODO(), tx)
  1023  			if err != nil {
  1024  				// We already deleted the entries from staleChains, but if
  1025  				// the chains still exist, they'll just get added back
  1026  				// (with a later timestamp) at the end of the sync.
  1027  				klog.ErrorS(err, "Unable to delete stale chains; will retry later")
  1028  				// FIXME: metric
  1029  			}
  1030  		}
  1031  	}
  1032  
  1033  	// Now start the actual syncing transaction
  1034  	tx := proxier.nftables.NewTransaction()
  1035  	proxier.setupNFTables(tx)
  1036  
  1037  	// We need to use, eg, "ip daddr" for IPv4 but "ip6 daddr" for IPv6
  1038  	ipX := "ip"
  1039  	ipvX_addr := "ipv4_addr" //nolint:stylecheck // var name intentionally resembles value
  1040  	if proxier.ipFamily == v1.IPv6Protocol {
  1041  		ipX = "ip6"
  1042  		ipvX_addr = "ipv6_addr"
  1043  	}
  1044  
  1045  	// We currently fully-rebuild our sets and maps on each resync
  1046  	tx.Flush(&knftables.Set{
  1047  		Name: clusterIPsSet,
  1048  	})
  1049  	tx.Flush(&knftables.Map{
  1050  		Name: firewallIPsMap,
  1051  	})
  1052  	tx.Flush(&knftables.Map{
  1053  		Name: noEndpointServicesMap,
  1054  	})
  1055  	tx.Flush(&knftables.Map{
  1056  		Name: noEndpointNodePortsMap,
  1057  	})
  1058  	tx.Flush(&knftables.Map{
  1059  		Name: serviceIPsMap,
  1060  	})
  1061  	tx.Flush(&knftables.Map{
  1062  		Name: serviceNodePortsMap,
  1063  	})
  1064  
  1065  	// Accumulate service/endpoint chains and affinity sets to keep.
  1066  	activeChains := sets.New[string]()
  1067  	activeAffinitySets := sets.New[string]()
  1068  
  1069  	// Compute total number of endpoint chains across all services
  1070  	// to get a sense of how big the cluster is.
  1071  	totalEndpoints := 0
  1072  	for svcName := range proxier.svcPortMap {
  1073  		totalEndpoints += len(proxier.endpointsMap[svcName])
  1074  	}
  1075  
  1076  	// These two variables are used to publish the sync_proxy_rules_no_endpoints_total
  1077  	// metric.
  1078  	serviceNoLocalEndpointsTotalInternal := 0
  1079  	serviceNoLocalEndpointsTotalExternal := 0
  1080  
  1081  	// Build rules for each service-port.
  1082  	for svcName, svc := range proxier.svcPortMap {
  1083  		svcInfo, ok := svc.(*servicePortInfo)
  1084  		if !ok {
  1085  			klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
  1086  			continue
  1087  		}
  1088  		protocol := strings.ToLower(string(svcInfo.Protocol()))
  1089  		svcPortNameString := svcInfo.nameString
  1090  
  1091  		// Figure out the endpoints for Cluster and Local traffic policy.
  1092  		// allLocallyReachableEndpoints is the set of all endpoints that can be routed to
  1093  		// from this node, given the service's traffic policies. hasEndpoints is true
  1094  		// if the service has any usable endpoints on any node, not just this one.
  1095  		allEndpoints := proxier.endpointsMap[svcName]
  1096  		clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
  1097  
  1098  		// Note the endpoint chains that will be used
  1099  		for _, ep := range allLocallyReachableEndpoints {
  1100  			if epInfo, ok := ep.(*endpointInfo); ok {
  1101  				ensureChain(epInfo.chainName, tx, activeChains)
  1102  			}
  1103  		}
  1104  
  1105  		// clusterPolicyChain contains the endpoints used with "Cluster" traffic policy
  1106  		clusterPolicyChain := svcInfo.clusterPolicyChainName
  1107  		usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
  1108  		if usesClusterPolicyChain {
  1109  			ensureChain(clusterPolicyChain, tx, activeChains)
  1110  		}
  1111  
  1112  		// localPolicyChain contains the endpoints used with "Local" traffic policy
  1113  		localPolicyChain := svcInfo.localPolicyChainName
  1114  		usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()
  1115  		if usesLocalPolicyChain {
  1116  			ensureChain(localPolicyChain, tx, activeChains)
  1117  		}
  1118  
  1119  		// internalPolicyChain is the chain containing the endpoints for
  1120  		// "internal" (ClusterIP) traffic. internalTrafficChain is the chain that
  1121  		// internal traffic is routed to (which is always the same as
  1122  		// internalPolicyChain). hasInternalEndpoints is true if we should
  1123  		// generate rules pointing to internalTrafficChain, or false if there are
  1124  		// no available internal endpoints.
  1125  		internalPolicyChain := clusterPolicyChain
  1126  		hasInternalEndpoints := hasEndpoints
  1127  		if svcInfo.InternalPolicyLocal() {
  1128  			internalPolicyChain = localPolicyChain
  1129  			if len(localEndpoints) == 0 {
  1130  				hasInternalEndpoints = false
  1131  			}
  1132  		}
  1133  		internalTrafficChain := internalPolicyChain
  1134  
  1135  		// Similarly, externalPolicyChain is the chain containing the endpoints
  1136  		// for "external" (NodePort, LoadBalancer, and ExternalIP) traffic.
  1137  		// externalTrafficChain is the chain that external traffic is routed to
  1138  		// (which is always the service's "EXT" chain). hasExternalEndpoints is
  1139  		// true if there are endpoints that will be reached by external traffic.
  1140  		// (But we may still have to generate externalTrafficChain even if there
  1141  		// are no external endpoints, to ensure that the short-circuit rules for
  1142  		// local traffic are set up.)
  1143  		externalPolicyChain := clusterPolicyChain
  1144  		hasExternalEndpoints := hasEndpoints
  1145  		if svcInfo.ExternalPolicyLocal() {
  1146  			externalPolicyChain = localPolicyChain
  1147  			if len(localEndpoints) == 0 {
  1148  				hasExternalEndpoints = false
  1149  			}
  1150  		}
  1151  		externalTrafficChain := svcInfo.externalChainName // eventually jumps to externalPolicyChain
  1152  
  1153  		// usesExternalTrafficChain is based on hasEndpoints, not hasExternalEndpoints,
  1154  		// because we need the local-traffic-short-circuiting rules even when there
  1155  		// are no externally-usable endpoints.
  1156  		usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()
  1157  		if usesExternalTrafficChain {
  1158  			ensureChain(externalTrafficChain, tx, activeChains)
  1159  		}
  1160  
  1161  		var internalTrafficFilterVerdict, externalTrafficFilterVerdict string
  1162  		if !hasEndpoints {
  1163  			// The service has no endpoints at all; hasInternalEndpoints and
  1164  			// hasExternalEndpoints will also be false, and we will not
  1165  			// generate any chains in the "nat" table for the service; only
  1166  			// rules in the "filter" table rejecting incoming packets for
  1167  			// the service's IPs.
  1168  			internalTrafficFilterVerdict = fmt.Sprintf("goto %s", rejectChain)
  1169  			externalTrafficFilterVerdict = fmt.Sprintf("goto %s", rejectChain)
  1170  		} else {
  1171  			if !hasInternalEndpoints {
  1172  				// The internalTrafficPolicy is "Local" but there are no local
  1173  				// endpoints. Traffic to the clusterIP will be dropped, but
  1174  				// external traffic may still be accepted.
  1175  				internalTrafficFilterVerdict = "drop"
  1176  				serviceNoLocalEndpointsTotalInternal++
  1177  			}
  1178  			if !hasExternalEndpoints {
  1179  				// The externalTrafficPolicy is "Local" but there are no
  1180  				// local endpoints. Traffic to "external" IPs from outside
  1181  				// the cluster will be dropped, but traffic from inside
  1182  				// the cluster may still be accepted.
  1183  				externalTrafficFilterVerdict = "drop"
  1184  				serviceNoLocalEndpointsTotalExternal++
  1185  			}
  1186  		}
  1187  
  1188  		// Capture the clusterIP.
  1189  		tx.Add(&knftables.Element{
  1190  			Set: clusterIPsSet,
  1191  			Key: []string{svcInfo.ClusterIP().String()},
  1192  		})
  1193  		if hasInternalEndpoints {
  1194  			tx.Add(&knftables.Element{
  1195  				Map: serviceIPsMap,
  1196  				Key: []string{
  1197  					svcInfo.ClusterIP().String(),
  1198  					protocol,
  1199  					strconv.Itoa(svcInfo.Port()),
  1200  				},
  1201  				Value: []string{
  1202  					fmt.Sprintf("goto %s", internalTrafficChain),
  1203  				},
  1204  			})
  1205  		} else {
  1206  			// No endpoints.
  1207  			tx.Add(&knftables.Element{
  1208  				Map: noEndpointServicesMap,
  1209  				Key: []string{
  1210  					svcInfo.ClusterIP().String(),
  1211  					protocol,
  1212  					strconv.Itoa(svcInfo.Port()),
  1213  				},
  1214  				Value: []string{
  1215  					internalTrafficFilterVerdict,
  1216  				},
  1217  				Comment: &svcPortNameString,
  1218  			})
  1219  		}
  1220  
  1221  		// Capture externalIPs.
  1222  		for _, externalIP := range svcInfo.ExternalIPs() {
  1223  			if hasEndpoints {
  1224  				// Send traffic bound for external IPs to the "external
  1225  				// destinations" chain.
  1226  				tx.Add(&knftables.Element{
  1227  					Map: serviceIPsMap,
  1228  					Key: []string{
  1229  						externalIP.String(),
  1230  						protocol,
  1231  						strconv.Itoa(svcInfo.Port()),
  1232  					},
  1233  					Value: []string{
  1234  						fmt.Sprintf("goto %s", externalTrafficChain),
  1235  					},
  1236  				})
  1237  			}
  1238  			if !hasExternalEndpoints {
  1239  				// Either no endpoints at all (REJECT) or no endpoints for
  1240  				// external traffic (DROP anything that didn't get
  1241  				// short-circuited by the EXT chain.)
  1242  				tx.Add(&knftables.Element{
  1243  					Map: noEndpointServicesMap,
  1244  					Key: []string{
  1245  						externalIP.String(),
  1246  						protocol,
  1247  						strconv.Itoa(svcInfo.Port()),
  1248  					},
  1249  					Value: []string{
  1250  						externalTrafficFilterVerdict,
  1251  					},
  1252  					Comment: &svcPortNameString,
  1253  				})
  1254  			}
  1255  		}
  1256  
  1257  		usesFWChain := len(svcInfo.LoadBalancerVIPs()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
  1258  		fwChain := svcInfo.firewallChainName
  1259  		if usesFWChain {
  1260  			ensureChain(fwChain, tx, activeChains)
  1261  			var sources []string
  1262  			allowFromNode := false
  1263  			for _, cidr := range svcInfo.LoadBalancerSourceRanges() {
  1264  				if len(sources) > 0 {
  1265  					sources = append(sources, ",")
  1266  				}
  1267  				sources = append(sources, cidr.String())
  1268  				if cidr.Contains(proxier.nodeIP) {
  1269  					allowFromNode = true
  1270  				}
  1271  			}
  1272  			// For VIP-like LBs, the VIP is often added as a local
  1273  			// address (via an IP route rule).  In that case, a request
  1274  			// from a node to the VIP will not hit the loadbalancer but
  1275  			// will loop back with the source IP set to the VIP.  We
  1276  			// need the following rules to allow requests from this node.
  1277  			if allowFromNode {
  1278  				for _, lbip := range svcInfo.LoadBalancerVIPs() {
  1279  					sources = append(sources, ",", lbip.String())
  1280  				}
  1281  			}
  1282  			tx.Add(&knftables.Rule{
  1283  				Chain: fwChain,
  1284  				Rule: knftables.Concat(
  1285  					ipX, "saddr", "!=", "{", sources, "}",
  1286  					"drop",
  1287  				),
  1288  			})
  1289  		}
  1290  
  1291  		// Capture load-balancer ingress.
  1292  		for _, lbip := range svcInfo.LoadBalancerVIPs() {
  1293  			if hasEndpoints {
  1294  				tx.Add(&knftables.Element{
  1295  					Map: serviceIPsMap,
  1296  					Key: []string{
  1297  						lbip.String(),
  1298  						protocol,
  1299  						strconv.Itoa(svcInfo.Port()),
  1300  					},
  1301  					Value: []string{
  1302  						fmt.Sprintf("goto %s", externalTrafficChain),
  1303  					},
  1304  				})
  1305  			}
  1306  
  1307  			if usesFWChain {
  1308  				tx.Add(&knftables.Element{
  1309  					Map: firewallIPsMap,
  1310  					Key: []string{
  1311  						lbip.String(),
  1312  						protocol,
  1313  						strconv.Itoa(svcInfo.Port()),
  1314  					},
  1315  					Value: []string{
  1316  						fmt.Sprintf("goto %s", fwChain),
  1317  					},
  1318  					Comment: &svcPortNameString,
  1319  				})
  1320  			}
  1321  		}
  1322  		if !hasExternalEndpoints {
  1323  			// Either no endpoints at all (REJECT) or no endpoints for
  1324  			// external traffic (DROP anything that didn't get short-circuited
  1325  			// by the EXT chain.)
  1326  			for _, lbip := range svcInfo.LoadBalancerVIPs() {
  1327  				tx.Add(&knftables.Element{
  1328  					Map: noEndpointServicesMap,
  1329  					Key: []string{
  1330  						lbip.String(),
  1331  						protocol,
  1332  						strconv.Itoa(svcInfo.Port()),
  1333  					},
  1334  					Value: []string{
  1335  						externalTrafficFilterVerdict,
  1336  					},
  1337  					Comment: &svcPortNameString,
  1338  				})
  1339  			}
  1340  		}
  1341  
  1342  		// Capture nodeports.
  1343  		if svcInfo.NodePort() != 0 {
  1344  			if hasEndpoints {
  1345  				// Jump to the external destination chain.  For better or for
  1346  				// worse, nodeports are not subect to loadBalancerSourceRanges,
  1347  				// and we can't change that.
  1348  				tx.Add(&knftables.Element{
  1349  					Map: serviceNodePortsMap,
  1350  					Key: []string{
  1351  						protocol,
  1352  						strconv.Itoa(svcInfo.NodePort()),
  1353  					},
  1354  					Value: []string{
  1355  						fmt.Sprintf("goto %s", externalTrafficChain),
  1356  					},
  1357  				})
  1358  			}
  1359  			if !hasExternalEndpoints {
  1360  				// Either no endpoints at all (REJECT) or no endpoints for
  1361  				// external traffic (DROP anything that didn't get
  1362  				// short-circuited by the EXT chain.)
  1363  				tx.Add(&knftables.Element{
  1364  					Map: noEndpointNodePortsMap,
  1365  					Key: []string{
  1366  						protocol,
  1367  						strconv.Itoa(svcInfo.NodePort()),
  1368  					},
  1369  					Value: []string{
  1370  						externalTrafficFilterVerdict,
  1371  					},
  1372  					Comment: &svcPortNameString,
  1373  				})
  1374  			}
  1375  		}
  1376  
  1377  		// Set up internal traffic handling.
  1378  		if hasInternalEndpoints {
  1379  			if proxier.masqueradeAll {
  1380  				tx.Add(&knftables.Rule{
  1381  					Chain: internalTrafficChain,
  1382  					Rule: knftables.Concat(
  1383  						ipX, "daddr", svcInfo.ClusterIP(),
  1384  						protocol, "dport", svcInfo.Port(),
  1385  						"jump", markMasqChain,
  1386  					),
  1387  				})
  1388  			} else if proxier.localDetector.IsImplemented() {
  1389  				// This masquerades off-cluster traffic to a service VIP. The
  1390  				// idea is that you can establish a static route for your
  1391  				// Service range, routing to any node, and that node will
  1392  				// bridge into the Service for you. Since that might bounce
  1393  				// off-node, we masquerade here.
  1394  				tx.Add(&knftables.Rule{
  1395  					Chain: internalTrafficChain,
  1396  					Rule: knftables.Concat(
  1397  						ipX, "daddr", svcInfo.ClusterIP(),
  1398  						protocol, "dport", svcInfo.Port(),
  1399  						proxier.localDetector.IfNotLocalNFT(),
  1400  						"jump", markMasqChain,
  1401  					),
  1402  				})
  1403  			}
  1404  		}
  1405  
  1406  		// Set up external traffic handling (if any "external" destinations are
  1407  		// enabled). All captured traffic for all external destinations should
  1408  		// jump to externalTrafficChain, which will handle some special cases and
  1409  		// then jump to externalPolicyChain.
  1410  		if usesExternalTrafficChain {
  1411  			if !svcInfo.ExternalPolicyLocal() {
  1412  				// If we are using non-local endpoints we need to masquerade,
  1413  				// in case we cross nodes.
  1414  				tx.Add(&knftables.Rule{
  1415  					Chain: externalTrafficChain,
  1416  					Rule: knftables.Concat(
  1417  						"jump", markMasqChain,
  1418  					),
  1419  				})
  1420  			} else {
  1421  				// If we are only using same-node endpoints, we can retain the
  1422  				// source IP in most cases.
  1423  
  1424  				if proxier.localDetector.IsImplemented() {
  1425  					// Treat all locally-originated pod -> external destination
  1426  					// traffic as a special-case.  It is subject to neither
  1427  					// form of traffic policy, which simulates going up-and-out
  1428  					// to an external load-balancer and coming back in.
  1429  					tx.Add(&knftables.Rule{
  1430  						Chain: externalTrafficChain,
  1431  						Rule: knftables.Concat(
  1432  							proxier.localDetector.IfLocalNFT(),
  1433  							"goto", clusterPolicyChain,
  1434  						),
  1435  						Comment: ptr.To("short-circuit pod traffic"),
  1436  					})
  1437  				}
  1438  
  1439  				// Locally originated traffic (not a pod, but the host node)
  1440  				// still needs masquerade because the LBIP itself is a local
  1441  				// address, so that will be the chosen source IP.
  1442  				tx.Add(&knftables.Rule{
  1443  					Chain: externalTrafficChain,
  1444  					Rule: knftables.Concat(
  1445  						"fib", "saddr", "type", "local",
  1446  						"jump", markMasqChain,
  1447  					),
  1448  					Comment: ptr.To("masquerade local traffic"),
  1449  				})
  1450  
  1451  				// Redirect all src-type=LOCAL -> external destination to the
  1452  				// policy=cluster chain. This allows traffic originating
  1453  				// from the host to be redirected to the service correctly.
  1454  				tx.Add(&knftables.Rule{
  1455  					Chain: externalTrafficChain,
  1456  					Rule: knftables.Concat(
  1457  						"fib", "saddr", "type", "local",
  1458  						"goto", clusterPolicyChain,
  1459  					),
  1460  					Comment: ptr.To("short-circuit local traffic"),
  1461  				})
  1462  			}
  1463  
  1464  			// Anything else falls thru to the appropriate policy chain.
  1465  			if hasExternalEndpoints {
  1466  				tx.Add(&knftables.Rule{
  1467  					Chain: externalTrafficChain,
  1468  					Rule: knftables.Concat(
  1469  						"goto", externalPolicyChain,
  1470  					),
  1471  				})
  1472  			}
  1473  		}
  1474  
  1475  		if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1476  			// Generate the per-endpoint affinity sets
  1477  			for _, ep := range allLocallyReachableEndpoints {
  1478  				epInfo, ok := ep.(*endpointInfo)
  1479  				if !ok {
  1480  					klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep)
  1481  					continue
  1482  				}
  1483  
  1484  				// Create a set to store current affinity mappings. As
  1485  				// with the iptables backend, endpoint affinity is
  1486  				// recorded for connections from a particular source IP
  1487  				// (without regard to source port) to a particular
  1488  				// ServicePort (without regard to which service IP was
  1489  				// used to reach the service). This may be changed in the
  1490  				// future.
  1491  				tx.Add(&knftables.Set{
  1492  					Name: epInfo.affinitySetName,
  1493  					Type: ipvX_addr,
  1494  					Flags: []knftables.SetFlag{
  1495  						// The nft docs say "dynamic" is only
  1496  						// needed for sets containing stateful
  1497  						// objects (eg counters), but (at least on
  1498  						// RHEL8) if we create the set without
  1499  						// "dynamic", it later gets mutated to
  1500  						// have it, and then the next attempt to
  1501  						// tx.Add() it here fails because it looks
  1502  						// like we're trying to change the flags.
  1503  						knftables.DynamicFlag,
  1504  						knftables.TimeoutFlag,
  1505  					},
  1506  					Timeout: ptr.To(time.Duration(svcInfo.StickyMaxAgeSeconds()) * time.Second),
  1507  				})
  1508  				activeAffinitySets.Insert(epInfo.affinitySetName)
  1509  			}
  1510  		}
  1511  
  1512  		// If Cluster policy is in use, create the chain and create rules jumping
  1513  		// from clusterPolicyChain to the clusterEndpoints
  1514  		if usesClusterPolicyChain {
  1515  			proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints)
  1516  		}
  1517  
  1518  		// If Local policy is in use, create rules jumping from localPolicyChain
  1519  		// to the localEndpoints
  1520  		if usesLocalPolicyChain {
  1521  			proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, localPolicyChain, localEndpoints)
  1522  		}
  1523  
  1524  		// Generate the per-endpoint chains
  1525  		for _, ep := range allLocallyReachableEndpoints {
  1526  			epInfo, ok := ep.(*endpointInfo)
  1527  			if !ok {
  1528  				klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
  1529  				continue
  1530  			}
  1531  
  1532  			endpointChain := epInfo.chainName
  1533  
  1534  			// Handle traffic that loops back to the originator with SNAT.
  1535  			tx.Add(&knftables.Rule{
  1536  				Chain: endpointChain,
  1537  				Rule: knftables.Concat(
  1538  					ipX, "saddr", epInfo.IP(),
  1539  					"jump", markMasqChain,
  1540  				),
  1541  			})
  1542  
  1543  			// Handle session affinity
  1544  			if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1545  				tx.Add(&knftables.Rule{
  1546  					Chain: endpointChain,
  1547  					Rule: knftables.Concat(
  1548  						"update", "@", epInfo.affinitySetName,
  1549  						"{", ipX, "saddr", "}",
  1550  					),
  1551  				})
  1552  			}
  1553  
  1554  			// DNAT to final destination.
  1555  			tx.Add(&knftables.Rule{
  1556  				Chain: endpointChain,
  1557  				Rule: knftables.Concat(
  1558  					"meta l4proto", protocol,
  1559  					"dnat to", epInfo.String(),
  1560  				),
  1561  			})
  1562  		}
  1563  	}
  1564  
  1565  	// Figure out which chains are now stale. Unfortunately, we can't delete them
  1566  	// right away, because with kernels before 6.2, if there is a map element pointing
  1567  	// to a chain, and you delete that map element, the kernel doesn't notice until a
  1568  	// short amount of time later that the chain is now unreferenced. So we flush them
  1569  	// now, and record the time that they become stale in staleChains so they can be
  1570  	// deleted later.
  1571  	existingChains, err := proxier.nftables.List(context.TODO(), "chains")
  1572  	if err == nil {
  1573  		for _, chain := range existingChains {
  1574  			if isServiceChainName(chain) {
  1575  				if !activeChains.Has(chain) {
  1576  					tx.Flush(&knftables.Chain{
  1577  						Name: chain,
  1578  					})
  1579  					proxier.staleChains[chain] = start
  1580  				} else {
  1581  					delete(proxier.staleChains, chain)
  1582  				}
  1583  			}
  1584  		}
  1585  	} else if !knftables.IsNotFound(err) {
  1586  		klog.ErrorS(err, "Failed to list nftables chains: stale chains will not be deleted")
  1587  	}
  1588  
  1589  	// OTOH, we can immediately delete any stale affinity sets
  1590  	existingSets, err := proxier.nftables.List(context.TODO(), "sets")
  1591  	if err == nil {
  1592  		for _, set := range existingSets {
  1593  			if isAffinitySetName(set) && !activeAffinitySets.Has(set) {
  1594  				tx.Delete(&knftables.Set{
  1595  					Name: set,
  1596  				})
  1597  			}
  1598  		}
  1599  	} else if !knftables.IsNotFound(err) {
  1600  		klog.ErrorS(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
  1601  	}
  1602  
  1603  	// Sync rules.
  1604  	klog.V(2).InfoS("Reloading service nftables data",
  1605  		"numServices", len(proxier.svcPortMap),
  1606  		"numEndpoints", totalEndpoints,
  1607  	)
  1608  
  1609  	// FIXME
  1610  	// klog.V(9).InfoS("Running nftables transaction", "transaction", tx.Bytes())
  1611  
  1612  	err = proxier.nftables.Run(context.TODO(), tx)
  1613  	if err != nil {
  1614  		klog.ErrorS(err, "nftables sync failed")
  1615  		metrics.IptablesRestoreFailuresTotal.Inc()
  1616  		return
  1617  	}
  1618  	success = true
  1619  
  1620  	for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
  1621  		for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
  1622  			latency := metrics.SinceInSeconds(lastChangeTriggerTime)
  1623  			metrics.NetworkProgrammingLatency.Observe(latency)
  1624  			klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
  1625  		}
  1626  	}
  1627  
  1628  	metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal))
  1629  	metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal))
  1630  	if proxier.healthzServer != nil {
  1631  		proxier.healthzServer.Updated(proxier.ipFamily)
  1632  	}
  1633  	metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
  1634  
  1635  	// Update service healthchecks.  The endpoints list might include services that are
  1636  	// not "OnlyLocal", but the services list will not, and the serviceHealthServer
  1637  	// will just drop those endpoints.
  1638  	if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
  1639  		klog.ErrorS(err, "Error syncing healthcheck services")
  1640  	}
  1641  	if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
  1642  		klog.ErrorS(err, "Error syncing healthcheck endpoints")
  1643  	}
  1644  
  1645  	// Finish housekeeping, clear stale conntrack entries for UDP Services
  1646  	conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
  1647  }
  1648  
  1649  func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcPortNameString string, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {
  1650  	// First write session affinity rules, if applicable.
  1651  	if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1652  		ipX := "ip"
  1653  		if proxier.ipFamily == v1.IPv6Protocol {
  1654  			ipX = "ip6"
  1655  		}
  1656  
  1657  		for _, ep := range endpoints {
  1658  			epInfo, ok := ep.(*endpointInfo)
  1659  			if !ok {
  1660  				continue
  1661  			}
  1662  
  1663  			tx.Add(&knftables.Rule{
  1664  				Chain: svcChain,
  1665  				Rule: knftables.Concat(
  1666  					ipX, "saddr", "@", epInfo.affinitySetName,
  1667  					"goto", epInfo.chainName,
  1668  				),
  1669  			})
  1670  		}
  1671  	}
  1672  
  1673  	// Now write loadbalancing rule
  1674  	var elements []string
  1675  	for i, ep := range endpoints {
  1676  		epInfo, ok := ep.(*endpointInfo)
  1677  		if !ok {
  1678  			continue
  1679  		}
  1680  
  1681  		elements = append(elements,
  1682  			strconv.Itoa(i), ":", "goto", epInfo.chainName,
  1683  		)
  1684  		if i != len(endpoints)-1 {
  1685  			elements = append(elements, ",")
  1686  		}
  1687  	}
  1688  	tx.Add(&knftables.Rule{
  1689  		Chain: svcChain,
  1690  		Rule: knftables.Concat(
  1691  			"numgen random mod", len(endpoints), "vmap",
  1692  			"{", elements, "}",
  1693  		),
  1694  	})
  1695  }
  1696  

View as plain text