//go:build linux // +build linux /* Copyright 2015 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package nftables import ( "fmt" "net" "reflect" "testing" "time" "github.com/lithammer/dedent" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/conntrack" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metrics" proxyutil "k8s.io/kubernetes/pkg/proxy/util" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" "k8s.io/kubernetes/pkg/util/async" netutils "k8s.io/utils/net" "k8s.io/utils/ptr" "sigs.k8s.io/knftables" ) // Conventions for tests using NewFakeProxier: // // Pod IPs: // Service ClusterIPs: // Node IPs: // Local Node IP: // Service ExternalIPs: // LoadBalancer IPs:,, // Non-cluster IPs: // LB Source Range: const testHostname = "test-hostname" const testNodeIP = "" const testNodeIPAlt = "" const testExternalIP = "" const testNodeIPv6 = "2001:db8::1" const testNodeIPv6Alt = "2001:db8:1::2" const testExternalClient = "" const testExternalClientBlocked = "" var testNodeIPs = []string{testNodeIP, testNodeIPAlt, testExternalIP, testNodeIPv6, testNodeIPv6Alt} func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. nftablesFamily := knftables.IPv4Family podCIDR := "" serviceCIDRs := "" if ipFamily == v1.IPv6Protocol { nftablesFamily = knftables.IPv6Family podCIDR = "fd00:10::/64" serviceCIDRs = "fd00:10:96::/112" } detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR(podCIDR) networkInterfacer := proxyutiltest.NewFakeNetwork() itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0} addrs := []net.Addr{ &net.IPNet{IP: netutils.ParseIPSloppy(""), Mask: net.CIDRMask(8, 32)}, &net.IPNet{IP: netutils.ParseIPSloppy("::1/128"), Mask: net.CIDRMask(128, 128)}, } networkInterfacer.AddInterfaceAddr(&itf, addrs) itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0} addrs1 := []net.Addr{ &net.IPNet{IP: netutils.ParseIPSloppy(testNodeIP), Mask: net.CIDRMask(24, 32)}, &net.IPNet{IP: netutils.ParseIPSloppy(testNodeIPAlt), Mask: net.CIDRMask(24, 32)}, &net.IPNet{IP: netutils.ParseIPSloppy(testExternalIP), Mask: net.CIDRMask(24, 32)}, &net.IPNet{IP: netutils.ParseIPSloppy(testNodeIPv6), Mask: net.CIDRMask(64, 128)}, &net.IPNet{IP: netutils.ParseIPSloppy(testNodeIPv6Alt), Mask: net.CIDRMask(64, 128)}, } networkInterfacer.AddInterfaceAddr(&itf1, addrs1) nft := knftables.NewFake(nftablesFamily, kubeProxyTable) var nodeIP net.IP if ipFamily == v1.IPv4Protocol { nodeIP = netutils.ParseIPSloppy(testNodeIP) } else { nodeIP = netutils.ParseIPSloppy(testNodeIPv6) } p := &Proxier{ ipFamily: ipFamily, svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil), nftables: nft, masqueradeMark: "0x4000", conntrack: conntrack.NewFake(), localDetector: detectLocal, hostname: testHostname, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), nodeIP: nodeIP, nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil, nodeIP), networkInterfacer: networkInterfacer, staleChains: make(map[string]time.Time), serviceCIDRs: serviceCIDRs, } p.setInitialized(true) p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) return nft, p } // TestOverallNFTablesRules creates a variety of services and verifies that the generated // rules are exactly as expected. func TestOverallNFTablesRules(t *testing.T) { nft, fp := NewFakeProxier(v1.IPv4Protocol) metrics.RegisterMetrics() makeServiceMap(fp, // create ClusterIP service makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, }} }), // create LoadBalancer service with Local traffic policy makeTestService("ns2", "svc2", func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, NodePort: 3001, }} svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: "", }} svc.Spec.ExternalIPs = []string{""} svc.Spec.HealthCheckNodePort = 30000 }), // create NodePort service makeTestService("ns3", "svc3", func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, NodePort: 3003, }} }), // create ExternalIP service makeTestService("ns4", "svc4", func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = "" svc.Spec.ExternalIPs = []string{""} svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80), }} }), // create LoadBalancer service with Cluster traffic policy, source ranges, // and session affinity makeTestService("ns5", "svc5", func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, NodePort: 3002, }} svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: "", }} svc.Spec.HealthCheckNodePort = 30000 // Extra whitespace to ensure that invalid value will not result // in a crash, for backward compatibility. svc.Spec.LoadBalancerSourceRanges = []string{""} svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{ ClientIP: &v1.ClientIPConfig{ TimeoutSeconds: ptr.To[int32](10800), }, } }), // create ClusterIP service with no endpoints makeTestService("ns6", "svc6", func(svc *v1.Service) { svc.Spec.Type = "ClusterIP" svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80), }} }), ) populateEndpointSlices(fp, // create ClusterIP service endpoints makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), // create Local LoadBalancer endpoints. Note that since we aren't setting // its NodeName, this endpoint will be considered non-local and ignored. makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), // create NodePort service endpoints makeTestEndpointSlice("ns3", "svc3", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), // create ExternalIP service endpoints makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }, { Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), // create Cluster LoadBalancer endpoints makeTestEndpointSlice("ns5", "svc5", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() expected := dedent.Dedent(` add table ip kube-proxy { comment "rules for kube-proxy" ; } add chain ip kube-proxy mark-for-masquerade add rule ip kube-proxy mark-for-masquerade mark set mark or 0x4000 add chain ip kube-proxy masquerading add rule ip kube-proxy masquerading mark and 0x4000 == 0 return add rule ip kube-proxy masquerading mark set mark xor 0x4000 add rule ip kube-proxy masquerading masquerade fully-random add chain ip kube-proxy services add chain ip kube-proxy service-endpoints-check add rule ip kube-proxy service-endpoints-check ip daddr . meta l4proto . th dport vmap @no-endpoint-services add chain ip kube-proxy filter-prerouting { type filter hook prerouting priority -110 ; } add rule ip kube-proxy filter-prerouting ct state new jump firewall-check add chain ip kube-proxy filter-forward { type filter hook forward priority -110 ; } add rule ip kube-proxy filter-forward ct state new jump service-endpoints-check add rule ip kube-proxy filter-forward ct state new jump cluster-ips-check add chain ip kube-proxy filter-input { type filter hook input priority -110 ; } add rule ip kube-proxy filter-input ct state new jump nodeport-endpoints-check add rule ip kube-proxy filter-input ct state new jump service-endpoints-check add chain ip kube-proxy filter-output { type filter hook output priority -110 ; } add rule ip kube-proxy filter-output ct state new jump service-endpoints-check add rule ip kube-proxy filter-output ct state new jump firewall-check add chain ip kube-proxy filter-output-post-dnat { type filter hook output priority -90 ; } add rule ip kube-proxy filter-output-post-dnat ct state new jump cluster-ips-check add chain ip kube-proxy nat-output { type nat hook output priority -100 ; } add rule ip kube-proxy nat-output jump services add chain ip kube-proxy nat-postrouting { type nat hook postrouting priority 100 ; } add rule ip kube-proxy nat-postrouting jump masquerading add chain ip kube-proxy nat-prerouting { type nat hook prerouting priority -100 ; } add rule ip kube-proxy nat-prerouting jump services add chain ip kube-proxy nodeport-endpoints-check add rule ip kube-proxy nodeport-endpoints-check ip daddr @nodeport-ips meta l4proto . th dport vmap @no-endpoint-nodeports add set ip kube-proxy cluster-ips { type ipv4_addr ; comment "Active ClusterIPs" ; } add chain ip kube-proxy cluster-ips-check add rule ip kube-proxy cluster-ips-check ip daddr @cluster-ips reject comment "Reject traffic to invalid ports of ClusterIPs" add rule ip kube-proxy cluster-ips-check ip daddr { } drop comment "Drop traffic to unallocated ClusterIPs" add set ip kube-proxy nodeport-ips { type ipv4_addr ; comment "IPs that accept NodePort traffic" ; } add map ip kube-proxy firewall-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "destinations that are subject to LoadBalancerSourceRanges" ; } add chain ip kube-proxy firewall-check add rule ip kube-proxy firewall-check ip daddr . meta l4proto . th dport vmap @firewall-ips add chain ip kube-proxy reject-chain { comment "helper for @no-endpoint-services / @no-endpoint-nodeports" ; } add rule ip kube-proxy reject-chain reject add map ip kube-proxy no-endpoint-services { type ipv4_addr . inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to services with no endpoints" ; } add map ip kube-proxy no-endpoint-nodeports { type inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to service nodeports with no endpoints" ; } add map ip kube-proxy service-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "ClusterIP, ExternalIP and LoadBalancer IP traffic" ; } add map ip kube-proxy service-nodeports { type inet_proto . inet_service : verdict ; comment "NodePort traffic" ; } add rule ip kube-proxy services ip daddr . meta l4proto . th dport vmap @service-ips add rule ip kube-proxy services ip daddr @nodeport-ips meta l4proto . th dport vmap @service-nodeports add element ip kube-proxy nodeport-ips { } # svc1 add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80 } add chain ip kube-proxy endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80 add rule ip kube-proxy endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80 meta l4proto tcp dnat to add element ip kube-proxy cluster-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } # svc2 add chain ip kube-proxy service-42NFTM6N-ns2/svc2/tcp/p80 add rule ip kube-proxy service-42NFTM6N-ns2/svc2/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-42NFTM6N-ns2/svc2/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80 } add chain ip kube-proxy external-42NFTM6N-ns2/svc2/tcp/p80 add rule ip kube-proxy external-42NFTM6N-ns2/svc2/tcp/p80 ip saddr goto service-42NFTM6N-ns2/svc2/tcp/p80 comment "short-circuit pod traffic" add rule ip kube-proxy external-42NFTM6N-ns2/svc2/tcp/p80 fib saddr type local jump mark-for-masquerade comment "masquerade local traffic" add rule ip kube-proxy external-42NFTM6N-ns2/svc2/tcp/p80 fib saddr type local goto service-42NFTM6N-ns2/svc2/tcp/p80 comment "short-circuit local traffic" add chain ip kube-proxy endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80 add rule ip kube-proxy endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80 meta l4proto tcp dnat to add element ip kube-proxy cluster-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-42NFTM6N-ns2/svc2/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto external-42NFTM6N-ns2/svc2/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto external-42NFTM6N-ns2/svc2/tcp/p80 } add element ip kube-proxy service-nodeports { tcp . 3001 : goto external-42NFTM6N-ns2/svc2/tcp/p80 } add element ip kube-proxy no-endpoint-nodeports { tcp . 3001 comment "ns2/svc2:p80" : drop } add element ip kube-proxy no-endpoint-services { . tcp . 80 comment "ns2/svc2:p80" : drop } add element ip kube-proxy no-endpoint-services { . tcp . 80 comment "ns2/svc2:p80" : drop } # svc3 add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80 } add chain ip kube-proxy external-4AT6LBPK-ns3/svc3/tcp/p80 add rule ip kube-proxy external-4AT6LBPK-ns3/svc3/tcp/p80 jump mark-for-masquerade add rule ip kube-proxy external-4AT6LBPK-ns3/svc3/tcp/p80 goto service-4AT6LBPK-ns3/svc3/tcp/p80 add chain ip kube-proxy endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80 add rule ip kube-proxy endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80 meta l4proto tcp dnat to add element ip kube-proxy cluster-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 } add element ip kube-proxy service-nodeports { tcp . 3003 : goto external-4AT6LBPK-ns3/svc3/tcp/p80 } # svc4 add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 2 vmap { 0 : goto endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80 , 1 : goto endpoint-5RFCDDV7-ns4/svc4/tcp/p80__10.180.0.5/80 } add chain ip kube-proxy external-LAUZTJTB-ns4/svc4/tcp/p80 add rule ip kube-proxy external-LAUZTJTB-ns4/svc4/tcp/p80 jump mark-for-masquerade add rule ip kube-proxy external-LAUZTJTB-ns4/svc4/tcp/p80 goto service-LAUZTJTB-ns4/svc4/tcp/p80 add chain ip kube-proxy endpoint-5RFCDDV7-ns4/svc4/tcp/p80__10.180.0.5/80 add rule ip kube-proxy endpoint-5RFCDDV7-ns4/svc4/tcp/p80__10.180.0.5/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5RFCDDV7-ns4/svc4/tcp/p80__10.180.0.5/80 meta l4proto tcp dnat to add chain ip kube-proxy endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80 add rule ip kube-proxy endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80 meta l4proto tcp dnat to add element ip kube-proxy cluster-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto external-LAUZTJTB-ns4/svc4/tcp/p80 } # svc5 add set ip kube-proxy affinity-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 { type ipv4_addr ; flags dynamic,timeout ; timeout 10800s ; } add chain ip kube-proxy service-HVFWP5L3-ns5/svc5/tcp/p80 add rule ip kube-proxy service-HVFWP5L3-ns5/svc5/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-HVFWP5L3-ns5/svc5/tcp/p80 ip saddr @affinity-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 goto endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 add rule ip kube-proxy service-HVFWP5L3-ns5/svc5/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 } add chain ip kube-proxy external-HVFWP5L3-ns5/svc5/tcp/p80 add rule ip kube-proxy external-HVFWP5L3-ns5/svc5/tcp/p80 jump mark-for-masquerade add rule ip kube-proxy external-HVFWP5L3-ns5/svc5/tcp/p80 goto service-HVFWP5L3-ns5/svc5/tcp/p80 add chain ip kube-proxy endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 add rule ip kube-proxy endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 update @affinity-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 { ip saddr } add rule ip kube-proxy endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 meta l4proto tcp dnat to add chain ip kube-proxy firewall-HVFWP5L3-ns5/svc5/tcp/p80 add rule ip kube-proxy firewall-HVFWP5L3-ns5/svc5/tcp/p80 ip saddr != { } drop add element ip kube-proxy cluster-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-HVFWP5L3-ns5/svc5/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto external-HVFWP5L3-ns5/svc5/tcp/p80 } add element ip kube-proxy service-nodeports { tcp . 3002 : goto external-HVFWP5L3-ns5/svc5/tcp/p80 } add element ip kube-proxy firewall-ips { . tcp . 80 comment "ns5/svc5:p80" : goto firewall-HVFWP5L3-ns5/svc5/tcp/p80 } # svc6 add element ip kube-proxy cluster-ips { } add element ip kube-proxy no-endpoint-services { . tcp . 80 comment "ns6/svc6:p80" : goto reject-chain } `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) } // TestNoEndpointsReject tests that a service with no endpoints rejects connections to // its ClusterIP, ExternalIPs, NodePort, and LoadBalancer IP. func TestNoEndpointsReject(t *testing.T) { nft, fp := NewFakeProxier(v1.IPv4Protocol) svcIP := "" svcPort := 80 svcNodePort := 3001 svcExternalIPs := "" svcLBIP := "" svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ClusterIP = svcIP svc.Spec.ExternalIPs = []string{svcExternalIPs} svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Protocol: v1.ProtocolTCP, Port: int32(svcPort), NodePort: int32(svcNodePort), }} svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: svcLBIP, }} }), ) fp.syncProxyRules() runPacketFlowTests(t, getLine(), nft, testNodeIPs, []packetFlowTest{ { name: "pod to cluster IP with no endpoints", sourceIP: "", destIP: svcIP, destPort: svcPort, output: "REJECT", }, { name: "external to external IP with no endpoints", sourceIP: testExternalClient, destIP: svcExternalIPs, destPort: svcPort, output: "REJECT", }, { name: "pod to NodePort with no endpoints", sourceIP: "", destIP: testNodeIP, destPort: svcNodePort, output: "REJECT", }, { name: "external to NodePort with no endpoints", sourceIP: testExternalClient, destIP: testNodeIP, destPort: svcNodePort, output: "REJECT", }, { name: "pod to LoadBalancer IP with no endpoints", sourceIP: "", destIP: svcLBIP, destPort: svcPort, output: "REJECT", }, { name: "external to LoadBalancer IP with no endpoints", sourceIP: testExternalClient, destIP: svcLBIP, destPort: svcPort, output: "REJECT", }, }) } // TestClusterIPGeneral tests various basic features of a ClusterIP service func TestClusterIPGeneral(t *testing.T) { nft, fp := NewFakeProxier(v1.IPv4Protocol) makeServiceMap(fp, makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "http", Port: 80, Protocol: v1.ProtocolTCP, }} }), makeTestService("ns2", "svc2", func(svc *v1.Service) { svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{ { Name: "http", Port: 80, Protocol: v1.ProtocolTCP, }, { Name: "https", Port: 443, Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(8443), }, { Name: "dns-udp", Port: 53, Protocol: v1.ProtocolUDP, }, { Name: "dns-tcp", Port: 53, Protocol: v1.ProtocolTCP, // We use TargetPort on TCP but not UDP/SCTP to // help disambiguate the output. TargetPort: intstr.FromInt32(5353), }, { Name: "dns-sctp", Port: 53, Protocol: v1.ProtocolSCTP, }, } }), ) populateEndpointSlices(fp, makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("http"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{ { Addresses: []string{""}, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, NodeName: ptr.To("host2"), }, } eps.Ports = []discovery.EndpointPort{ { Name: ptr.To("http"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }, { Name: ptr.To("https"), Port: ptr.To[int32](8443), Protocol: ptr.To(v1.ProtocolTCP), }, { Name: ptr.To("dns-udp"), Port: ptr.To[int32](53), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("dns-tcp"), Port: ptr.To[int32](5353), Protocol: ptr.To(v1.ProtocolTCP), }, { Name: ptr.To("dns-sctp"), Port: ptr.To[int32](53), Protocol: ptr.To(v1.ProtocolSCTP), }, } }), ) fp.syncProxyRules() runPacketFlowTests(t, getLine(), nft, testNodeIPs, []packetFlowTest{ { name: "simple clusterIP", sourceIP: "", destIP: "", destPort: 80, output: "", masq: false, }, { name: "hairpin to cluster IP", sourceIP: "", destIP: "", destPort: 80, output: "", masq: true, }, { name: "clusterIP with multiple endpoints", sourceIP: "", destIP: "", destPort: 80, output: ",", masq: false, }, { name: "clusterIP with TargetPort", sourceIP: "", destIP: "", destPort: 443, output: ",", masq: false, }, { name: "clusterIP with TCP, UDP, and SCTP on same port (TCP)", sourceIP: "", protocol: v1.ProtocolTCP, destIP: "", destPort: 53, output: ",", masq: false, }, { name: "clusterIP with TCP, UDP, and SCTP on same port (TCP)", sourceIP: "", protocol: v1.ProtocolUDP, destIP: "", destPort: 53, output: ",", masq: false, }, { name: "clusterIP with TCP, UDP, and SCTP on same port (SCTP)", sourceIP: "", protocol: v1.ProtocolSCTP, destIP: "", destPort: 53, output: ",", masq: false, }, { name: "TCP-only port does not match UDP traffic", sourceIP: "", protocol: v1.ProtocolUDP, destIP: "", destPort: 80, output: "REJECT", }, { name: "svc1 does not accept svc2's ports", sourceIP: "", destIP: "", destPort: 443, output: "REJECT", }, { name: "packet to unallocated cluster ip", sourceIP: "", destIP: "", destPort: 80, output: "DROP", }, }) } func TestLoadBalancer(t *testing.T) { nft, fp := NewFakeProxier(v1.IPv4Protocol) svcIP := "" svcPort := 80 svcNodePort := 3001 svcLBIP1 := "" svcLBIP2 := "" svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", Protocol: v1.ProtocolTCP, } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, NodePort: int32(svcNodePort), }} svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{ {IP: svcLBIP1}, {IP: svcLBIP2}, } svc.Spec.LoadBalancerSourceRanges = []string{ "", // Regression test that excess whitespace gets ignored "", } }), ) epIP := "" populateEndpointSlices(fp, makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To(svcPortName.Port), Port: ptr.To(int32(svcPort)), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() runPacketFlowTests(t, getLine(), nft, testNodeIPs, []packetFlowTest{ { name: "pod to cluster IP", sourceIP: "", destIP: svcIP, destPort: svcPort, output: fmt.Sprintf("%s:%d", epIP, svcPort), masq: false, }, { name: "external to nodePort", sourceIP: testExternalClient, destIP: testNodeIP, destPort: svcNodePort, output: fmt.Sprintf("%s:%d", epIP, svcPort), masq: true, }, { name: "nodePort bypasses LoadBalancerSourceRanges", sourceIP: testExternalClientBlocked, destIP: testNodeIP, destPort: svcNodePort, output: fmt.Sprintf("%s:%d", epIP, svcPort), masq: true, }, { name: "accepted external to LB1", sourceIP: testExternalClient, destIP: svcLBIP1, destPort: svcPort, output: fmt.Sprintf("%s:%d", epIP, svcPort), masq: true, }, { name: "accepted external to LB2", sourceIP: testExternalClient, destIP: svcLBIP2, destPort: svcPort, output: fmt.Sprintf("%s:%d", epIP, svcPort), masq: true, }, { name: "blocked external to LB1", sourceIP: testExternalClientBlocked, destIP: svcLBIP1, destPort: svcPort, output: "DROP", }, { name: "blocked external to LB2", sourceIP: testExternalClientBlocked, destIP: svcLBIP2, destPort: svcPort, output: "DROP", }, { name: "pod to LB1 (blocked by LoadBalancerSourceRanges)", sourceIP: "", destIP: svcLBIP1, destPort: svcPort, output: "DROP", }, { name: "pod to LB2 (blocked by LoadBalancerSourceRanges)", sourceIP: "", destIP: svcLBIP2, destPort: svcPort, output: "DROP", }, { name: "node to LB1 (allowed by LoadBalancerSourceRanges)", sourceIP: testNodeIP, destIP: svcLBIP1, destPort: svcPort, output: fmt.Sprintf("%s:%d", epIP, svcPort), masq: true, }, { name: "node to LB2 (allowed by LoadBalancerSourceRanges)", sourceIP: testNodeIP, destIP: svcLBIP2, destPort: svcPort, output: fmt.Sprintf("%s:%d", epIP, svcPort), masq: true, }, // The LB rules assume that when you connect from a node to a LB IP, that // something external to kube-proxy will cause the connection to be // SNATted to the LB IP, so if the LoadBalancerSourceRanges include the // node IP, then we add a rule allowing traffic from the LB IP as well... { name: "same node to LB1, SNATted to LB1 (implicitly allowed)", sourceIP: svcLBIP1, destIP: svcLBIP1, destPort: svcPort, output: fmt.Sprintf("%s:%d", epIP, svcPort), masq: true, }, { name: "same node to LB2, SNATted to LB2 (implicitly allowed)", sourceIP: svcLBIP2, destIP: svcLBIP2, destPort: svcPort, output: fmt.Sprintf("%s:%d", epIP, svcPort), masq: true, }, }) } // TestNodePorts tests NodePort services under various combinations of the // --nodeport-addresses flags. func TestNodePorts(t *testing.T) { testCases := []struct { name string family v1.IPFamily nodePortAddresses []string // allowAltNodeIP is true if we expect NodePort traffic on the alternate // node IP to be accepted allowAltNodeIP bool }{ { name: "ipv4", family: v1.IPv4Protocol, nodePortAddresses: nil, allowAltNodeIP: false, }, { name: "ipv4, multiple nodeport-addresses", family: v1.IPv4Protocol, nodePortAddresses: []string{"", "", "2001:db8::/64"}, allowAltNodeIP: true, }, { name: "ipv6", family: v1.IPv6Protocol, nodePortAddresses: nil, allowAltNodeIP: false, }, { name: "ipv6, multiple nodeport-addresses", family: v1.IPv6Protocol, nodePortAddresses: []string{"", "", "2001:db8::/64", "2001:db8:1::2/128"}, allowAltNodeIP: true, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { nft, fp := NewFakeProxier(tc.family) var svcIP, epIP1, epIP2 string var nodeIP string if tc.family == v1.IPv4Protocol { svcIP = "" epIP1 = "" epIP2 = "" nodeIP = testNodeIP } else { svcIP = "fd00:172:30::41" epIP1 = "fd00:10:180::1" epIP2 = "fd00:10:180::2:1" nodeIP = testNodeIPv6 } if tc.nodePortAddresses != nil { fp.nodePortAddresses = proxyutil.NewNodePortAddresses(tc.family, tc.nodePortAddresses, netutils.ParseIPSloppy(nodeIP)) } makeServiceMap(fp, makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeNodePort svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, NodePort: 3001, }} }), ) populateEndpointSlices(fp, makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { if tc.family == v1.IPv4Protocol { eps.AddressType = discovery.AddressTypeIPv4 } else { eps.AddressType = discovery.AddressTypeIPv6 } eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP1}, NodeName: nil, }, { Addresses: []string{epIP2}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() var podIP, externalClientIP, altNodeIP string if tc.family == v1.IPv4Protocol { podIP = "" externalClientIP = testExternalClient altNodeIP = testNodeIPAlt } else { podIP = "fd00:10::2" externalClientIP = "2600:5200::1" altNodeIP = testNodeIPv6Alt } output := net.JoinHostPort(epIP1, "80") + ", " + net.JoinHostPort(epIP2, "80") // Basic tests are the same for all cases runPacketFlowTests(t, getLine(), nft, testNodeIPs, []packetFlowTest{ { name: "pod to cluster IP", sourceIP: podIP, destIP: svcIP, destPort: 80, output: output, masq: false, }, { name: "external to nodePort", sourceIP: externalClientIP, destIP: nodeIP, destPort: 3001, output: output, masq: true, }, { name: "node to nodePort", sourceIP: nodeIP, destIP: nodeIP, destPort: 3001, output: output, masq: true, }, }) if tc.allowAltNodeIP { runPacketFlowTests(t, getLine(), nft, testNodeIPs, []packetFlowTest{ { name: "external to nodePort on secondary IP", sourceIP: externalClientIP, destIP: altNodeIP, destPort: 3001, output: output, masq: true, }, }) } else { runPacketFlowTests(t, getLine(), nft, testNodeIPs, []packetFlowTest{ { name: "secondary nodeIP ignores NodePorts", sourceIP: externalClientIP, destIP: altNodeIP, destPort: 3001, output: "", }, }) } }) } } // TestExternalTrafficPolicyLocal tests that traffic to externally-facing IPs does not get // masqueraded when using Local traffic policy. For traffic from external sources, that // means it can also only be routed to local endpoints, but for traffic from internal // sources, it gets routed to all endpoints. func TestExternalTrafficPolicyLocal(t *testing.T) { nft, fp := NewFakeProxier(v1.IPv4Protocol) svcIP := "" svcPort := 80 svcNodePort := 3001 svcHealthCheckNodePort := 30000 svcExternalIPs := "" svcLBIP := "" svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal svc.Spec.ClusterIP = svcIP svc.Spec.ExternalIPs = []string{svcExternalIPs} svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, NodePort: int32(svcNodePort), TargetPort: intstr.FromInt32(int32(svcPort)), }} svc.Spec.HealthCheckNodePort = int32(svcHealthCheckNodePort) svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: svcLBIP, }} }), ) epIP1 := "" epIP2 := "" populateEndpointSlices(fp, makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP1}, }, { Addresses: []string{epIP2}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To(svcPortName.Port), Port: ptr.To(int32(svcPort)), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() runPacketFlowTests(t, getLine(), nft, testNodeIPs, []packetFlowTest{ { name: "pod to cluster IP hits both endpoints, unmasqueraded", sourceIP: "", destIP: svcIP, destPort: svcPort, output: fmt.Sprintf("%s:%d, %s:%d", epIP1, svcPort, epIP2, svcPort), masq: false, }, { name: "pod to external IP hits both endpoints, unmasqueraded", sourceIP: "", destIP: svcExternalIPs, destPort: svcPort, output: fmt.Sprintf("%s:%d, %s:%d", epIP1, svcPort, epIP2, svcPort), masq: false, }, { name: "external to external IP hits only local endpoint, unmasqueraded", sourceIP: testExternalClient, destIP: svcExternalIPs, destPort: svcPort, output: fmt.Sprintf("%s:%d", epIP2, svcPort), masq: false, }, { name: "pod to LB IP hits only both endpoints, unmasqueraded", sourceIP: "", destIP: svcLBIP, destPort: svcPort, output: fmt.Sprintf("%s:%d, %s:%d", epIP1, svcPort, epIP2, svcPort), masq: false, }, { name: "external to LB IP hits only local endpoint, unmasqueraded", sourceIP: testExternalClient, destIP: svcLBIP, destPort: svcPort, output: fmt.Sprintf("%s:%d", epIP2, svcPort), masq: false, }, { name: "pod to NodePort hits both endpoints, unmasqueraded", sourceIP: "", destIP: testNodeIP, destPort: svcNodePort, output: fmt.Sprintf("%s:%d, %s:%d", epIP1, svcPort, epIP2, svcPort), masq: false, }, { name: "external to NodePort hits only local endpoint, unmasqueraded", sourceIP: testExternalClient, destIP: testNodeIP, destPort: svcNodePort, output: fmt.Sprintf("%s:%d", epIP2, svcPort), masq: false, }, }) } // TestExternalTrafficPolicyCluster tests that traffic to an externally-facing IP gets // masqueraded when using Cluster traffic policy. func TestExternalTrafficPolicyCluster(t *testing.T) { nft, fp := NewFakeProxier(v1.IPv4Protocol) svcIP := "" svcPort := 80 svcNodePort := 3001 svcExternalIPs := "" svcLBIP := "" svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ClusterIP = svcIP svc.Spec.ExternalIPs = []string{svcExternalIPs} svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, NodePort: int32(svcNodePort), TargetPort: intstr.FromInt32(int32(svcPort)), }} svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: svcLBIP, }} svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster }), ) epIP1 := "" epIP2 := "" populateEndpointSlices(fp, makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP1}, NodeName: nil, }, { Addresses: []string{epIP2}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To(svcPortName.Port), Port: ptr.To(int32(svcPort)), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() runPacketFlowTests(t, getLine(), nft, testNodeIPs, []packetFlowTest{ { name: "pod to cluster IP hits both endpoints, unmasqueraded", sourceIP: "", destIP: svcIP, destPort: svcPort, output: fmt.Sprintf("%s:%d, %s:%d", epIP1, svcPort, epIP2, svcPort), masq: false, }, { name: "pod to external IP hits both endpoints, masqueraded", sourceIP: "", destIP: svcExternalIPs, destPort: svcPort, output: fmt.Sprintf("%s:%d, %s:%d", epIP1, svcPort, epIP2, svcPort), masq: true, }, { name: "external to external IP hits both endpoints, masqueraded", sourceIP: testExternalClient, destIP: svcExternalIPs, destPort: svcPort, output: fmt.Sprintf("%s:%d, %s:%d", epIP1, svcPort, epIP2, svcPort), masq: true, }, { name: "pod to LB IP hits both endpoints, masqueraded", sourceIP: "", destIP: svcLBIP, destPort: svcPort, output: fmt.Sprintf("%s:%d, %s:%d", epIP1, svcPort, epIP2, svcPort), masq: true, }, { name: "external to LB IP hits both endpoints, masqueraded", sourceIP: testExternalClient, destIP: svcLBIP, destPort: svcPort, output: fmt.Sprintf("%s:%d, %s:%d", epIP1, svcPort, epIP2, svcPort), masq: true, }, { name: "pod to NodePort hits both endpoints, masqueraded", sourceIP: "", destIP: testNodeIP, destPort: svcNodePort, output: fmt.Sprintf("%s:%d, %s:%d", epIP1, svcPort, epIP2, svcPort), masq: true, }, { name: "external to NodePort hits both endpoints, masqueraded", sourceIP: testExternalClient, destIP: testNodeIP, destPort: svcNodePort, output: fmt.Sprintf("%s:%d, %s:%d", epIP1, svcPort, epIP2, svcPort), masq: true, }, }) } func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service { svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, Annotations: map[string]string{}, }, Spec: v1.ServiceSpec{}, Status: v1.ServiceStatus{}, } svcFunc(svc) return svc } func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port, nodeport int32, targetPort int) []v1.ServicePort { svcPort := v1.ServicePort{ Name: name, Protocol: protocol, Port: port, NodePort: nodeport, TargetPort: intstr.FromInt32(int32(targetPort)), } return append(array, svcPort) } func TestBuildServiceMapAddRemove(t *testing.T) { _, fp := NewFakeProxier(v1.IPv4Protocol) services := []*v1.Service{ makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpport", "SCTP", 1236, 6321, 0) }), makeTestService("somewhere-else", "node-port", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeNodePort svc.Spec.ClusterIP = "" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "muchmoreblah", "SCTP", 343, 676, 0) }), makeTestService("somewhere", "load-balancer", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ClusterIP = "" svc.Spec.LoadBalancerIP = "" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001) svc.Status.LoadBalancer = v1.LoadBalancerStatus{ Ingress: []v1.LoadBalancerIngress{ {IP: ""}, }, } }), makeTestService("somewhere", "only-local-load-balancer", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ClusterIP = "" svc.Spec.LoadBalancerIP = "" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003) svc.Status.LoadBalancer = v1.LoadBalancerStatus{ Ingress: []v1.LoadBalancerIngress{ {IP: ""}, }, } svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal svc.Spec.HealthCheckNodePort = 345 }), } for i := range services { fp.OnServiceAdd(services[i]) } result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 10 { t.Errorf("expected service map length 10, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } // The only-local-loadbalancer ones get added healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 1 { t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts) } else { nsn := makeNSN("somewhere", "only-local-load-balancer") if port, found := healthCheckNodePorts[nsn]; !found || port != 345 { t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts) } } // Remove some stuff // oneService is a modification of services[0] with removed first port. oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0) }) fp.OnServiceUpdate(services[0], oneService) fp.OnServiceDelete(services[1]) fp.OnServiceDelete(services[2]) fp.OnServiceDelete(services[3]) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 1 { t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } // All services but one were deleted. While you'd expect only the ClusterIPs // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. expectedStaleUDPServices := []string{"", "", "", ""} if len(result.DeletedUDPClusterIPs) != len(expectedStaleUDPServices) { t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.DeletedUDPClusterIPs.UnsortedList()) } for _, ip := range expectedStaleUDPServices { if !result.DeletedUDPClusterIPs.Has(ip) { t.Errorf("expected stale UDP service service %s", ip) } } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts) } } func TestBuildServiceMapServiceHeadless(t *testing.T) { _, fp := NewFakeProxier(v1.IPv4Protocol) makeServiceMap(fp, makeTestService("somewhere-else", "headless", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = v1.ClusterIPNone svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0) }), makeTestService("somewhere-else", "headless-without-port", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = v1.ClusterIPNone }), ) // Headless service should be ignored result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } // No proxied services, so no healthchecks healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts)) } } func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { _, fp := NewFakeProxier(v1.IPv4Protocol) makeServiceMap(fp, makeTestService("somewhere-else", "external-name", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeExternalName svc.Spec.ClusterIP = "" // Should be ignored svc.Spec.ExternalName = "foo2.bar.com" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0) }), ) result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs) } // No proxied services, so no healthchecks healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) } } func TestBuildServiceMapServiceUpdate(t *testing.T) { _, fp := NewFakeProxier(v1.IPv4Protocol) servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0) }) servicev2 := makeTestService("somewhere", "some-service", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ClusterIP = "" svc.Spec.LoadBalancerIP = "" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003) svc.Status.LoadBalancer = v1.LoadBalancerStatus{ Ingress: []v1.LoadBalancerIngress{ {IP: ""}, }, } svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal svc.Spec.HealthCheckNodePort = 345 }) fp.OnServiceAdd(servicev1) result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) } // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList()) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) } // No change; make sure the service map stays the same and there are // no health-check changes fp.OnServiceUpdate(servicev2, servicev2) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList()) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) } // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) } } func populateEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.EndpointSlice) { for i := range allEndpointSlices { proxier.OnEndpointSliceAdd(allEndpointSlices[i]) } } func makeTestEndpointSlice(namespace, name string, sliceNum int, epsFunc func(*discovery.EndpointSlice)) *discovery.EndpointSlice { eps := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%d", name, sliceNum), Namespace: namespace, Labels: map[string]string{discovery.LabelServiceName: name}, }, } epsFunc(eps) return eps } func makeNSN(namespace, name string) types.NamespacedName { return types.NamespacedName{Namespace: namespace, Name: name} } func makeServicePortName(ns, name, port string, protocol v1.Protocol) proxy.ServicePortName { return proxy.ServicePortName{ NamespacedName: makeNSN(ns, name), Port: port, Protocol: protocol, } } func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) { for i := range allServices { proxier.OnServiceAdd(allServices[i]) } proxier.mu.Lock() defer proxier.mu.Unlock() proxier.servicesSynced = true } type endpointExpectation struct { endpoint string isLocal bool } func checkEndpointExpectations(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]endpointExpectation) { if len(newMap) != len(expected) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap) } for x := range expected { if len(newMap[x]) != len(expected[x]) { t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x])) } else { for i := range expected[x] { newEp := newMap[x][i] if newEp.String() != expected[x][i].endpoint || newEp.IsLocal() != expected[x][i].isLocal { t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp) } } } } } func TestUpdateEndpointsMap(t *testing.T) { emptyEndpointSlices := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, func(*discovery.EndpointSlice) {}), } subset1 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }} } subset2 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }} } namedPortLocal := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }} }), } namedPort := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subset1), } namedPortRenamed := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11-2"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }} }), } namedPortRenumbered := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](22), Protocol: ptr.To(v1.ProtocolUDP), }} }), } namedPortsLocalNoLocal := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }, { Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }} }), } multipleSubsets := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subset1), makeTestEndpointSlice("ns1", "ep1", 2, subset2), } subsetLocal := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }} } multipleSubsetsWithLocal := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subset1), makeTestEndpointSlice("ns1", "ep1", 2, subsetLocal), } subsetMultiplePortsLocal := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }} } subset3 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p13"), Port: ptr.To[int32](13), Protocol: ptr.To(v1.ProtocolUDP), }} } multipleSubsetsMultiplePortsLocal := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subsetMultiplePortsLocal), makeTestEndpointSlice("ns1", "ep1", 2, subset3), } subsetMultipleIPsPorts1 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }, { Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }} } subsetMultipleIPsPorts2 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }, { Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p13"), Port: ptr.To[int32](13), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p14"), Port: ptr.To[int32](14), Protocol: ptr.To(v1.ProtocolUDP), }} } subsetMultipleIPsPorts3 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }, { Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p21"), Port: ptr.To[int32](21), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p22"), Port: ptr.To[int32](22), Protocol: ptr.To(v1.ProtocolUDP), }} } multipleSubsetsIPsPorts := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subsetMultipleIPsPorts1), makeTestEndpointSlice("ns1", "ep1", 2, subsetMultipleIPsPorts2), makeTestEndpointSlice("ns2", "ep2", 1, subsetMultipleIPsPorts3), } complexSubset1 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p22"), Port: ptr.To[int32](22), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset2 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p23"), Port: ptr.To[int32](23), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset3 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p44"), Port: ptr.To[int32](44), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset4 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p45"), Port: ptr.To[int32](45), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset5 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }, { Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset6 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p122"), Port: ptr.To[int32](122), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset7 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p33"), Port: ptr.To[int32](33), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset8 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p44"), Port: ptr.To[int32](44), Protocol: ptr.To(v1.ProtocolUDP), }} } complexBefore := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subset1), nil, makeTestEndpointSlice("ns2", "ep2", 1, complexSubset1), makeTestEndpointSlice("ns2", "ep2", 2, complexSubset2), nil, makeTestEndpointSlice("ns4", "ep4", 1, complexSubset3), makeTestEndpointSlice("ns4", "ep4", 2, complexSubset4), } complexAfter := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, complexSubset5), makeTestEndpointSlice("ns1", "ep1", 2, complexSubset6), nil, nil, makeTestEndpointSlice("ns3", "ep3", 1, complexSubset7), makeTestEndpointSlice("ns4", "ep4", 1, complexSubset8), nil, } testCases := []struct { // previousEndpoints and currentEndpoints are used to call appropriate // handlers OnEndpoints* (based on whether corresponding values are nil // or non-nil) and must be of equal length. name string previousEndpoints []*discovery.EndpointSlice currentEndpoints []*discovery.EndpointSlice oldEndpoints map[proxy.ServicePortName][]endpointExpectation expectedResult map[proxy.ServicePortName][]endpointExpectation expectedDeletedUDPEndpoints []proxy.ServiceEndpoint expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool expectedLocalEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing name: "nothing", oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[1]: no change, named port, local name: "no change, named port, local", previousEndpoints: namedPortLocal, currentEndpoints: namedPortLocal, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { // Case[2]: no change, multiple subsets name: "no change, multiple subsets", previousEndpoints: multipleSubsets, currentEndpoints: multipleSubsets, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[3]: no change, multiple subsets, multiple ports, local name: "no change, multiple subsets, multiple ports, local", previousEndpoints: multipleSubsetsMultiplePortsLocal, currentEndpoints: multipleSubsetsMultiplePortsLocal, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { // Case[4]: no change, multiple endpoints, subsets, IPs, and ports name: "no change, multiple endpoints, subsets, IPs, and ports", previousEndpoints: multipleSubsetsIPsPorts, currentEndpoints: multipleSubsetsIPsPorts, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, }, }, { // Case[5]: add an Endpoints name: "add an Endpoints", previousEndpoints: []*discovery.EndpointSlice{nil}, currentEndpoints: namedPortLocal, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { // Case[6]: remove an Endpoints name: "remove an Endpoints", previousEndpoints: namedPortLocal, currentEndpoints: []*discovery.EndpointSlice{nil}, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[7]: add an IP and port name: "add an IP and port", previousEndpoints: namedPort, currentEndpoints: namedPortsLocalNoLocal, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { // Case[8]: remove an IP and port name: "remove an IP and port", previousEndpoints: namedPortsLocalNoLocal, currentEndpoints: namedPort, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }, { Endpoint: "", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }, { Endpoint: "", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[9]: add a subset name: "add a subset", previousEndpoints: []*discovery.EndpointSlice{namedPort[0], nil}, currentEndpoints: multipleSubsetsWithLocal, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { // Case[10]: remove a subset name: "remove a subset", previousEndpoints: multipleSubsets, currentEndpoints: []*discovery.EndpointSlice{namedPort[0], nil}, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[11]: rename a port name: "rename a port", previousEndpoints: namedPort, currentEndpoints: namedPortRenamed, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[12]: renumber a port name: "renumber a port", previousEndpoints: namedPort, currentEndpoints: namedPortRenumbered, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[13]: complex add and remove name: "complex add and remove", previousEndpoints: complexBefore, currentEndpoints: complexAfter, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, {endpoint: "", isLocal: true}, }, makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, {endpoint: "", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): { {endpoint: "", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "", ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), }, { Endpoint: "", ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), }, { Endpoint: "", ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP), }, { Endpoint: "", ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP), }, { Endpoint: "", ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, }, { // Case[14]: change from 0 endpoint address to 1 unnamed port name: "change from 0 endpoint address to 1 unnamed port", previousEndpoints: emptyEndpointSlices, currentEndpoints: namedPort, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{}, }, } for tci, tc := range testCases { t.Run(tc.name, func(t *testing.T) { _, fp := NewFakeProxier(v1.IPv4Protocol) fp.hostname = testHostname // First check that after adding all previous versions of endpoints, // the fp.oldEndpoints is as we expect. for i := range tc.previousEndpoints { if tc.previousEndpoints[i] != nil { fp.OnEndpointSliceAdd(tc.previousEndpoints[i]) } } fp.endpointsMap.Update(fp.endpointsChanges) checkEndpointExpectations(t, tci, fp.endpointsMap, tc.oldEndpoints) // Now let's call appropriate handlers to get to state we want to be. if len(tc.previousEndpoints) != len(tc.currentEndpoints) { t.Fatalf("[%d] different lengths of previous and current endpoints", tci) } for i := range tc.previousEndpoints { prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i] switch { case prev == nil: fp.OnEndpointSliceAdd(curr) case curr == nil: fp.OnEndpointSliceDelete(prev) default: fp.OnEndpointSliceUpdate(prev, curr) } } result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap checkEndpointExpectations(t, tci, newMap, tc.expectedResult) if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) { t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints) } for _, x := range tc.expectedDeletedUDPEndpoints { found := false for _, stale := range result.DeletedUDPEndpoints { if stale == x { found = true break } } if !found { t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints) } } if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) { t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices) } for svcName := range tc.expectedNewlyActiveUDPServices { found := false for _, stale := range result.NewlyActiveUDPServices { if stale == svcName { found = true } } if !found { t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices) } } localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) { t.Errorf("[%d] expected local endpoints %v, got %v", tci, tc.expectedLocalEndpoints, localReadyEndpoints) } }) } } // TestHealthCheckNodePortWhenTerminating tests that health check node ports are not enabled when all local endpoints are terminating func TestHealthCheckNodePortWhenTerminating(t *testing.T) { _, fp := NewFakeProxier(v1.IPv4Protocol) fp.OnServiceSynced() fp.OnEndpointSlicesSynced() serviceName := "svc1" namespaceName := "ns1" fp.OnServiceAdd(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "", Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP}}, }, }) endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{{ Addresses: []string{""}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To(testHostname), }, { // not ready endpoints should be ignored Addresses: []string{""}, Conditions: discovery.EndpointConditions{Ready: ptr.To(false)}, NodeName: ptr.To(testHostname), }}, } fp.OnEndpointSliceAdd(endpointSlice) _ = fp.endpointsMap.Update(fp.endpointsChanges) localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() if len(localReadyEndpoints) != 1 { t.Errorf("unexpected number of local ready endpoints, expected 1 but got: %d", len(localReadyEndpoints)) } // set all endpoints to terminating endpointSliceTerminating := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{{ Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { // not ready endpoints should be ignored Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }}, } fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating) _ = fp.endpointsMap.Update(fp.endpointsChanges) localReadyEndpoints = fp.endpointsMap.LocalReadyEndpoints() if len(localReadyEndpoints) != 0 { t.Errorf("unexpected number of local ready endpoints, expected 0 but got: %d", len(localReadyEndpoints)) } } // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces. // This test ensures that the iptables proxier supports translating Endpoints to // iptables output when internalTrafficPolicy is specified func TestInternalTrafficPolicy(t *testing.T) { type endpoint struct { ip string hostname string } testCases := []struct { name string line string internalTrafficPolicy *v1.ServiceInternalTrafficPolicy endpoints []endpoint flowTests []packetFlowTest }{ { name: "internalTrafficPolicy is cluster", line: getLine(), internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster), endpoints: []endpoint{ {"", testHostname}, {"", "host1"}, {"", "host2"}, }, flowTests: []packetFlowTest{ { name: "pod to ClusterIP hits all endpoints", sourceIP: "", destIP: "", destPort: 80, output: ",,", masq: false, }, }, }, { name: "internalTrafficPolicy is local and there is one local endpoint", line: getLine(), internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), endpoints: []endpoint{ {"", testHostname}, {"", "host1"}, {"", "host2"}, }, flowTests: []packetFlowTest{ { name: "pod to ClusterIP hits only local endpoint", sourceIP: "", destIP: "", destPort: 80, output: "", masq: false, }, }, }, { name: "internalTrafficPolicy is local and there are multiple local endpoints", line: getLine(), internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), endpoints: []endpoint{ {"", testHostname}, {"", testHostname}, {"", "host2"}, }, flowTests: []packetFlowTest{ { name: "pod to ClusterIP hits all local endpoints", sourceIP: "", destIP: "", destPort: 80, output: ",", masq: false, }, }, }, { name: "internalTrafficPolicy is local and there are no local endpoints", line: getLine(), internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), endpoints: []endpoint{ {"", "host0"}, {"", "host1"}, {"", "host2"}, }, flowTests: []packetFlowTest{ { name: "no endpoints", sourceIP: "", destIP: "", destPort: 80, output: "DROP", }, }, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { nft, fp := NewFakeProxier(v1.IPv4Protocol) fp.OnServiceSynced() fp.OnEndpointSlicesSynced() serviceName := "svc1" namespaceName := "ns1" svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "", Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Name: "", Port: 80, Protocol: v1.ProtocolTCP}}, }, } if tc.internalTrafficPolicy != nil { svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy } fp.OnServiceAdd(svc) endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, } for _, ep := range tc.endpoints { endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{ Addresses: []string{ep.ip}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To(ep.hostname), }) } fp.OnEndpointSliceAdd(endpointSlice) fp.syncProxyRules() runPacketFlowTests(t, tc.line, nft, testNodeIPs, tc.flowTests) fp.OnEndpointSliceDelete(endpointSlice) fp.syncProxyRules() runPacketFlowTests(t, tc.line, nft, testNodeIPs, []packetFlowTest{ { name: "endpoints deleted", sourceIP: "", destIP: "", destPort: 80, output: "REJECT", }, }) }) } } // TestTerminatingEndpointsTrafficPolicyLocal tests that when there are local ready and // ready + terminating endpoints, only the ready endpoints are used. func TestTerminatingEndpointsTrafficPolicyLocal(t *testing.T) { service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}, Spec: v1.ServiceSpec{ ClusterIP: "", Type: v1.ServiceTypeLoadBalancer, ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, Ports: []v1.ServicePort{ { Name: "", TargetPort: intstr.FromInt32(80), Port: 80, Protocol: v1.ProtocolTCP, }, }, HealthCheckNodePort: 30000, }, Status: v1.ServiceStatus{ LoadBalancer: v1.LoadBalancerStatus{ Ingress: []v1.LoadBalancerIngress{ {IP: ""}, }, }, }, } testcases := []struct { name string line string endpointslice *discovery.EndpointSlice flowTests []packetFlowTest }{ { name: "ready endpoints exist", line: getLine(), endpointslice: &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", "svc1"), Namespace: "ns1", Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To(testHostname), }, { // this endpoint should be ignored for external since there are ready non-terminating endpoints Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { // this endpoint should be ignored for external since there are ready non-terminating endpoints Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { // this endpoint should be ignored for external since it's not local Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To("host-1"), }, }, }, flowTests: []packetFlowTest{ { name: "pod to clusterIP", sourceIP: "", destIP: "", destPort: 80, output: ",,", masq: false, }, { name: "external to LB", sourceIP: testExternalClient, destIP: "", destPort: 80, output: ",", masq: false, }, }, }, { name: "only terminating endpoints exist", line: getLine(), endpointslice: &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", "svc1"), Namespace: "ns1", Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { // this endpoint should be used since there are only ready terminating endpoints Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { // this endpoint should be used since there are only ready terminating endpoints Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { // this endpoint should not be used since it is both terminating and not ready. Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { // this endpoint should be ignored for external since it's not local Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To("host-1"), }, }, }, flowTests: []packetFlowTest{ { name: "pod to clusterIP", sourceIP: "", destIP: "", destPort: 80, output: "", masq: false, }, { name: "external to LB", sourceIP: testExternalClient, destIP: "", destPort: 80, output: ",", masq: false, }, }, }, { name: "terminating endpoints on remote node", line: getLine(), endpointslice: &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", "svc1"), Namespace: "ns1", Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { // this endpoint won't be used because it's not local, // but it will prevent a REJECT rule from being created Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To("host-1"), }, }, }, flowTests: []packetFlowTest{ { name: "pod to clusterIP", sourceIP: "", destIP: "", destPort: 80, output: "", }, { name: "external to LB, no locally-usable endpoints", sourceIP: testExternalClient, destIP: "", destPort: 80, output: "DROP", }, }, }, { name: "no usable endpoints on any node", line: getLine(), endpointslice: &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", "svc1"), Namespace: "ns1", Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { // Local but not ready or serving Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { // Remote and not ready or serving Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To("host-1"), }, }, }, flowTests: []packetFlowTest{ { name: "pod to clusterIP, no usable endpoints", sourceIP: "", destIP: "", destPort: 80, output: "REJECT", }, { name: "external to LB, no usable endpoints", sourceIP: testExternalClient, destIP: "", destPort: 80, output: "REJECT", }, }, }, } for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { nft, fp := NewFakeProxier(v1.IPv4Protocol) fp.OnServiceSynced() fp.OnEndpointSlicesSynced() fp.OnServiceAdd(service) fp.OnEndpointSliceAdd(testcase.endpointslice) fp.syncProxyRules() runPacketFlowTests(t, testcase.line, nft, testNodeIPs, testcase.flowTests) fp.OnEndpointSliceDelete(testcase.endpointslice) fp.syncProxyRules() runPacketFlowTests(t, testcase.line, nft, testNodeIPs, []packetFlowTest{ { name: "pod to clusterIP after endpoints deleted", sourceIP: "", destIP: "", destPort: 80, output: "REJECT", }, { name: "external to LB after endpoints deleted", sourceIP: testExternalClient, destIP: "", destPort: 80, output: "REJECT", }, }) }) } } // TestTerminatingEndpointsTrafficPolicyCluster tests that when there are cluster-wide // ready and ready + terminating endpoints, only the ready endpoints are used. func TestTerminatingEndpointsTrafficPolicyCluster(t *testing.T) { service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}, Spec: v1.ServiceSpec{ ClusterIP: "", Type: v1.ServiceTypeLoadBalancer, ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyCluster, Ports: []v1.ServicePort{ { Name: "", TargetPort: intstr.FromInt32(80), Port: 80, Protocol: v1.ProtocolTCP, }, }, HealthCheckNodePort: 30000, }, Status: v1.ServiceStatus{ LoadBalancer: v1.LoadBalancerStatus{ Ingress: []v1.LoadBalancerIngress{ {IP: ""}, }, }, }, } testcases := []struct { name string line string endpointslice *discovery.EndpointSlice flowTests []packetFlowTest }{ { name: "ready endpoints exist", line: getLine(), endpointslice: &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", "svc1"), Namespace: "ns1", Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To(testHostname), }, { // this endpoint should be ignored since there are ready non-terminating endpoints Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To("another-host"), }, { // this endpoint should be ignored since it is not "serving" Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To("another-host"), }, { Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To("another-host"), }, }, }, flowTests: []packetFlowTest{ { name: "pod to clusterIP", sourceIP: "", destIP: "", destPort: 80, output: ",,", masq: false, }, { name: "external to LB", sourceIP: testExternalClient, destIP: "", destPort: 80, output: ",,", masq: true, }, }, }, { name: "only terminating endpoints exist", line: getLine(), endpointslice: &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", "svc1"), Namespace: "ns1", Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { // this endpoint should be used since there are only ready terminating endpoints Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { // this endpoint should be used since there are only ready terminating endpoints Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { // this endpoint should not be used since it is both terminating and not ready. Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To("another-host"), }, { // this endpoint should be used since there are only ready terminating endpoints Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To("another-host"), }, }, }, flowTests: []packetFlowTest{ { name: "pod to clusterIP", sourceIP: "", destIP: "", destPort: 80, output: ",,", masq: false, }, { name: "external to LB", sourceIP: testExternalClient, destIP: "", destPort: 80, output: ",,", masq: true, }, }, }, { name: "terminating endpoints on remote node", line: getLine(), endpointslice: &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", "svc1"), Namespace: "ns1", Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To("host-1"), }, }, }, flowTests: []packetFlowTest{ { name: "pod to clusterIP", sourceIP: "", destIP: "", destPort: 80, output: "", masq: false, }, { name: "external to LB", sourceIP: testExternalClient, destIP: "", destPort: 80, output: "", masq: true, }, }, }, { name: "no usable endpoints on any node", line: getLine(), endpointslice: &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", "svc1"), Namespace: "ns1", Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { // Local, not ready or serving Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { // Remote, not ready or serving Addresses: []string{""}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To("host-1"), }, }, }, flowTests: []packetFlowTest{ { name: "pod to clusterIP", sourceIP: "", destIP: "", destPort: 80, output: "REJECT", }, { name: "external to LB", sourceIP: testExternalClient, destIP: "", destPort: 80, output: "REJECT", }, }, }, } for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { nft, fp := NewFakeProxier(v1.IPv4Protocol) fp.OnServiceSynced() fp.OnEndpointSlicesSynced() fp.OnServiceAdd(service) fp.OnEndpointSliceAdd(testcase.endpointslice) fp.syncProxyRules() runPacketFlowTests(t, testcase.line, nft, testNodeIPs, testcase.flowTests) fp.OnEndpointSliceDelete(testcase.endpointslice) fp.syncProxyRules() runPacketFlowTests(t, testcase.line, nft, testNodeIPs, []packetFlowTest{ { name: "pod to clusterIP after endpoints deleted", sourceIP: "", destIP: "", destPort: 80, output: "REJECT", }, { name: "external to LB after endpoints deleted", sourceIP: testExternalClient, destIP: "", destPort: 80, output: "REJECT", }, }) }) } } func TestInternalExternalMasquerade(t *testing.T) { // (Put the test setup code in an internal function so we can have it here at the // top, before the test cases that will be run against it.) setupTest := func(fp *Proxier) { makeServiceMap(fp, makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, NodePort: int32(3001), }} svc.Spec.HealthCheckNodePort = 30001 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: "", }} }), makeTestService("ns2", "svc2", func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, NodePort: int32(3002), }} svc.Spec.HealthCheckNodePort = 30002 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: "", }} }), makeTestService("ns3", "svc3", func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, NodePort: int32(3003), }} svc.Spec.HealthCheckNodePort = 30003 svc.Spec.InternalTrafficPolicy = ptr.To(v1.ServiceInternalTrafficPolicyLocal) svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: "", }} }), ) populateEndpointSlices(fp, makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{ { Addresses: []string{""}, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, NodeName: ptr.To("remote"), }, } eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{ { Addresses: []string{""}, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, NodeName: ptr.To("remote"), }, } eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), makeTestEndpointSlice("ns3", "svc3", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{ { Addresses: []string{""}, NodeName: ptr.To(testHostname), }, { Addresses: []string{""}, NodeName: ptr.To("remote"), }, } eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() } // We use the same flowTests for all of the testCases. The "output" and "masq" // values here represent the normal case (working localDetector, no masqueradeAll) flowTests := []packetFlowTest{ { name: "pod to ClusterIP", sourceIP: "", destIP: "", destPort: 80, output: ",", masq: false, }, { name: "pod to NodePort", sourceIP: "", destIP: testNodeIP, destPort: 3001, output: ",", masq: true, }, { name: "pod to LB", sourceIP: "", destIP: "", destPort: 80, output: ",", masq: true, }, { name: "node to ClusterIP", sourceIP: testNodeIP, destIP: "", destPort: 80, output: ",", masq: true, }, { name: "node to NodePort", sourceIP: testNodeIP, destIP: testNodeIP, destPort: 3001, output: ",", masq: true, }, { name: "node to LB", sourceIP: testNodeIP, destIP: "", destPort: 80, output: ",", masq: true, }, { name: "external to ClusterIP", sourceIP: testExternalClient, destIP: "", destPort: 80, output: ",", masq: true, }, { name: "external to NodePort", sourceIP: testExternalClient, destIP: testNodeIP, destPort: 3001, output: ",", masq: true, }, { name: "external to LB", sourceIP: testExternalClient, destIP: "", destPort: 80, output: ",", masq: true, }, { name: "pod to ClusterIP with eTP:Local", sourceIP: "", destIP: "", destPort: 80, // externalTrafficPolicy does not apply to ClusterIP traffic, so same // as "Pod to ClusterIP" output: ",", masq: false, }, { name: "pod to NodePort with eTP:Local", sourceIP: "", destIP: testNodeIP, destPort: 3002, // See the comment below in the "pod to LB with eTP:Local" case. // It doesn't actually make sense to short-circuit here, since if // you connect directly to a NodePort from outside the cluster, // you only get the local endpoints. But it's simpler for us and // slightly more convenient for users to have this case get // short-circuited too. output: ",", masq: false, }, { name: "pod to LB with eTP:Local", sourceIP: "", destIP: "", destPort: 80, // The short-circuit rule is supposed to make this behave the same // way it would if the packet actually went out to the LB and then // came back into the cluster. So it gets routed to all endpoints, // not just local ones. In reality, if the packet actually left // the cluster, it would have to get masqueraded, but since we can // avoid doing that in the short-circuit case, and not masquerading // is more useful, we avoid masquerading. output: ",", masq: false, }, { name: "node to ClusterIP with eTP:Local", sourceIP: testNodeIP, destIP: "", destPort: 80, // externalTrafficPolicy does not apply to ClusterIP traffic, so same // as "node to ClusterIP" output: ",", masq: true, }, { name: "node to NodePort with eTP:Local", sourceIP: testNodeIP, destIP: testNodeIP, destPort: 3001, // The traffic gets short-circuited, ignoring externalTrafficPolicy, so // same as "node to NodePort" above. output: ",", masq: true, }, { name: "node to LB with eTP:Local", sourceIP: testNodeIP, destIP: "", destPort: 80, // The traffic gets short-circuited, ignoring externalTrafficPolicy, so // same as "node to LB" above. output: ",", masq: true, }, { name: "external to ClusterIP with eTP:Local", sourceIP: testExternalClient, destIP: "", destPort: 80, // externalTrafficPolicy does not apply to ClusterIP traffic, so same // as "external to ClusterIP" above. output: ",", masq: true, }, { name: "external to NodePort with eTP:Local", sourceIP: testExternalClient, destIP: testNodeIP, destPort: 3002, // externalTrafficPolicy applies; only the local endpoint is // selected, and we don't masquerade. output: "", masq: false, }, { name: "external to LB with eTP:Local", sourceIP: testExternalClient, destIP: "", destPort: 80, // externalTrafficPolicy applies; only the local endpoint is // selected, and we don't masquerade. output: "", masq: false, }, { name: "pod to ClusterIP with iTP:Local", sourceIP: "", destIP: "", destPort: 80, // internalTrafficPolicy applies; only the local endpoint is // selected. output: "", masq: false, }, { name: "pod to NodePort with iTP:Local", sourceIP: "", destIP: testNodeIP, destPort: 3003, // internalTrafficPolicy does not apply to NodePort traffic, so same as // "pod to NodePort" above. output: ",", masq: true, }, { name: "pod to LB with iTP:Local", sourceIP: "", destIP: "", destPort: 80, // internalTrafficPolicy does not apply to LoadBalancer traffic, so // same as "pod to LB" above. output: ",", masq: true, }, { name: "node to ClusterIP with iTP:Local", sourceIP: testNodeIP, destIP: "", destPort: 80, // internalTrafficPolicy applies; only the local endpoint is selected. // Traffic is masqueraded as in the "node to ClusterIP" case because // internalTrafficPolicy does not affect masquerading. output: "", masq: true, }, { name: "node to NodePort with iTP:Local", sourceIP: testNodeIP, destIP: testNodeIP, destPort: 3003, // internalTrafficPolicy does not apply to NodePort traffic, so same as // "node to NodePort" above. output: ",", masq: true, }, { name: "node to LB with iTP:Local", sourceIP: testNodeIP, destIP: "", destPort: 80, // internalTrafficPolicy does not apply to LoadBalancer traffic, so // same as "node to LB" above. output: ",", masq: true, }, { name: "external to ClusterIP with iTP:Local", sourceIP: testExternalClient, destIP: "", destPort: 80, // internalTrafficPolicy applies; only the local endpoint is selected. // Traffic is masqueraded as in the "external to ClusterIP" case // because internalTrafficPolicy does not affect masquerading. output: "", masq: true, }, { name: "external to NodePort with iTP:Local", sourceIP: testExternalClient, destIP: testNodeIP, destPort: 3003, // internalTrafficPolicy does not apply to NodePort traffic, so same as // "external to NodePort" above. output: ",", masq: true, }, { name: "external to LB with iTP:Local", sourceIP: testExternalClient, destIP: "", destPort: 80, // internalTrafficPolicy does not apply to LoadBalancer traffic, so // same as "external to LB" above. output: ",", masq: true, }, } type packetFlowTestOverride struct { output *string masq *bool } testCases := []struct { name string line string masqueradeAll bool localDetector bool overrides map[string]packetFlowTestOverride }{ { name: "base", line: getLine(), masqueradeAll: false, localDetector: true, overrides: nil, }, { name: "no LocalTrafficDetector", line: getLine(), masqueradeAll: false, localDetector: false, overrides: map[string]packetFlowTestOverride{ // With no LocalTrafficDetector, all traffic to a // ClusterIP is assumed to be from a pod, and thus to not // require masquerading. "node to ClusterIP": { masq: ptr.To(false), }, "node to ClusterIP with eTP:Local": { masq: ptr.To(false), }, "node to ClusterIP with iTP:Local": { masq: ptr.To(false), }, "external to ClusterIP": { masq: ptr.To(false), }, "external to ClusterIP with eTP:Local": { masq: ptr.To(false), }, "external to ClusterIP with iTP:Local": { masq: ptr.To(false), }, // And there's no eTP:Local short-circuit for pod traffic, // so pods get only the local endpoints. "pod to NodePort with eTP:Local": { output: ptr.To(""), }, "pod to LB with eTP:Local": { output: ptr.To(""), }, }, }, { name: "masqueradeAll", line: getLine(), masqueradeAll: true, localDetector: true, overrides: map[string]packetFlowTestOverride{ // All "to ClusterIP" traffic gets masqueraded when using // --masquerade-all. "pod to ClusterIP": { masq: ptr.To(true), }, "pod to ClusterIP with eTP:Local": { masq: ptr.To(true), }, "pod to ClusterIP with iTP:Local": { masq: ptr.To(true), }, }, }, { name: "masqueradeAll, no LocalTrafficDetector", line: getLine(), masqueradeAll: true, localDetector: false, overrides: map[string]packetFlowTestOverride{ // As in "masqueradeAll" "pod to ClusterIP": { masq: ptr.To(true), }, "pod to ClusterIP with eTP:Local": { masq: ptr.To(true), }, "pod to ClusterIP with iTP:Local": { masq: ptr.To(true), }, // As in "no LocalTrafficDetector" "pod to NodePort with eTP:Local": { output: ptr.To(""), }, "pod to LB with eTP:Local": { output: ptr.To(""), }, }, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { nft, fp := NewFakeProxier(v1.IPv4Protocol) fp.masqueradeAll = tc.masqueradeAll if !tc.localDetector { fp.localDetector = proxyutiliptables.NewNoOpLocalDetector() } setupTest(fp) // Merge base flowTests with per-test-case overrides tcFlowTests := make([]packetFlowTest, len(flowTests)) overridesApplied := 0 for i := range flowTests { tcFlowTests[i] = flowTests[i] if overrides, set := tc.overrides[flowTests[i].name]; set { overridesApplied++ if overrides.masq != nil { if tcFlowTests[i].masq == *overrides.masq { t.Errorf("%q override value for masq is same as base value", flowTests[i].name) } tcFlowTests[i].masq = *overrides.masq } if overrides.output != nil { if tcFlowTests[i].output == *overrides.output { t.Errorf("%q override value for output is same as base value", flowTests[i].name) } tcFlowTests[i].output = *overrides.output } } } if overridesApplied != len(tc.overrides) { t.Errorf("%d overrides did not match any test case name!", len(tc.overrides)-overridesApplied) } runPacketFlowTests(t, tc.line, nft, testNodeIPs, tcFlowTests) }) } } // Test calling syncProxyRules() multiple times with various changes func TestSyncProxyRulesRepeated(t *testing.T) { nft, fp := NewFakeProxier(v1.IPv4Protocol) baseRules := dedent.Dedent(` add table ip kube-proxy { comment "rules for kube-proxy" ; } add chain ip kube-proxy cluster-ips-check add chain ip kube-proxy filter-prerouting { type filter hook prerouting priority -110 ; } add chain ip kube-proxy filter-forward { type filter hook forward priority -110 ; } add chain ip kube-proxy filter-input { type filter hook input priority -110 ; } add chain ip kube-proxy filter-output { type filter hook output priority -110 ; } add chain ip kube-proxy filter-output-post-dnat { type filter hook output priority -90 ; } add chain ip kube-proxy firewall-check add chain ip kube-proxy mark-for-masquerade add chain ip kube-proxy masquerading add chain ip kube-proxy nat-output { type nat hook output priority -100 ; } add chain ip kube-proxy nat-postrouting { type nat hook postrouting priority 100 ; } add chain ip kube-proxy nat-prerouting { type nat hook prerouting priority -100 ; } add chain ip kube-proxy nodeport-endpoints-check add chain ip kube-proxy reject-chain { comment "helper for @no-endpoint-services / @no-endpoint-nodeports" ; } add chain ip kube-proxy services add chain ip kube-proxy service-endpoints-check add rule ip kube-proxy cluster-ips-check ip daddr @cluster-ips reject comment "Reject traffic to invalid ports of ClusterIPs" add rule ip kube-proxy cluster-ips-check ip daddr { } drop comment "Drop traffic to unallocated ClusterIPs" add rule ip kube-proxy filter-prerouting ct state new jump firewall-check add rule ip kube-proxy filter-forward ct state new jump service-endpoints-check add rule ip kube-proxy filter-forward ct state new jump cluster-ips-check add rule ip kube-proxy filter-input ct state new jump nodeport-endpoints-check add rule ip kube-proxy filter-input ct state new jump service-endpoints-check add rule ip kube-proxy filter-output ct state new jump service-endpoints-check add rule ip kube-proxy filter-output ct state new jump firewall-check add rule ip kube-proxy filter-output-post-dnat ct state new jump cluster-ips-check add rule ip kube-proxy firewall-check ip daddr . meta l4proto . th dport vmap @firewall-ips add rule ip kube-proxy mark-for-masquerade mark set mark or 0x4000 add rule ip kube-proxy masquerading mark and 0x4000 == 0 return add rule ip kube-proxy masquerading mark set mark xor 0x4000 add rule ip kube-proxy masquerading masquerade fully-random add rule ip kube-proxy nat-output jump services add rule ip kube-proxy nat-postrouting jump masquerading add rule ip kube-proxy nat-prerouting jump services add rule ip kube-proxy nodeport-endpoints-check ip daddr @nodeport-ips meta l4proto . th dport vmap @no-endpoint-nodeports add rule ip kube-proxy reject-chain reject add rule ip kube-proxy services ip daddr . meta l4proto . th dport vmap @service-ips add rule ip kube-proxy services ip daddr @nodeport-ips meta l4proto . th dport vmap @service-nodeports add set ip kube-proxy cluster-ips { type ipv4_addr ; comment "Active ClusterIPs" ; } add set ip kube-proxy nodeport-ips { type ipv4_addr ; comment "IPs that accept NodePort traffic" ; } add rule ip kube-proxy service-endpoints-check ip daddr . meta l4proto . th dport vmap @no-endpoint-services add map ip kube-proxy firewall-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "destinations that are subject to LoadBalancerSourceRanges" ; } add map ip kube-proxy no-endpoint-nodeports { type inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to service nodeports with no endpoints" ; } add map ip kube-proxy no-endpoint-services { type ipv4_addr . inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to services with no endpoints" ; } add map ip kube-proxy service-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "ClusterIP, ExternalIP and LoadBalancer IP traffic" ; } add map ip kube-proxy service-nodeports { type inet_proto . inet_service : verdict ; comment "NodePort traffic" ; } `) // Helper function to make it look like time has passed (from the point of view of // the stale-chain-deletion code). ageStaleChains := func() { for chain, t := range fp.staleChains { fp.staleChains[chain] = t.Add(-2 * time.Second) } } // Create initial state var svc2 *v1.Service makeServiceMap(fp, makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, }} }), makeTestService("ns2", "svc2", func(svc *v1.Service) { svc2 = svc svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p8080", Port: 8080, Protocol: v1.ProtocolTCP, }} }), ) populateEndpointSlices(fp, makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p8080"), Port: ptr.To[int32](8080), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() expected := baseRules + dedent.Dedent(` add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy nodeport-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 8080 : goto service-MHHHYRWA-ns2/svc2/tcp/p8080 } add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 ip daddr tcp dport 8080 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 numgen random mod 1 vmap { 0 : goto endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 } add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 meta l4proto tcp dnat to `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) // Add a new service and its endpoints makeServiceMap(fp, makeTestService("ns3", "svc3", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, }} }), ) var eps3 *discovery.EndpointSlice populateEndpointSlices(fp, makeTestEndpointSlice("ns3", "svc3", 1, func(eps *discovery.EndpointSlice) { eps3 = eps eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() expected = baseRules + dedent.Dedent(` add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy nodeport-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 8080 : goto service-MHHHYRWA-ns2/svc2/tcp/p8080 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 } add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 ip daddr tcp dport 8080 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 numgen random mod 1 vmap { 0 : goto endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 } add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 meta l4proto tcp dnat to add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 } add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) // Delete a service; its chains will be flushed, but not immediately deleted. fp.OnServiceDelete(svc2) fp.syncProxyRules() expected = baseRules + dedent.Dedent(` add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy nodeport-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 } add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 } add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) // Fake the passage of time and confirm that the stale chains get deleted. ageStaleChains() fp.syncProxyRules() expected = baseRules + dedent.Dedent(` add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy nodeport-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 } add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 } add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) // Add a service, sync, then add its endpoints. makeServiceMap(fp, makeTestService("ns4", "svc4", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, }} }), ) fp.syncProxyRules() expected = baseRules + dedent.Dedent(` add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy nodeport-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 } add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 } add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to add element ip kube-proxy no-endpoint-services { . tcp . 80 comment "ns4/svc4:p80" : goto reject-chain } `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) populateEndpointSlices(fp, makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() expected = baseRules + dedent.Dedent(` add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy nodeport-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 } add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 } add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 } add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) // Change an endpoint of an existing service. eps3update := eps3.DeepCopy() eps3update.Endpoints[0].Addresses[0] = "" fp.OnEndpointSliceUpdate(eps3, eps3update) fp.syncProxyRules() // The old endpoint chain (for will not be deleted yet. expected = baseRules + dedent.Dedent(` add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy nodeport-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 } add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 } add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 meta l4proto tcp dnat to add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 } add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) // (Ensure the old svc3 chain gets deleted in the next sync.) ageStaleChains() // Add an endpoint to a service. eps3update2 := eps3update.DeepCopy() eps3update2.Endpoints = append(eps3update2.Endpoints, discovery.Endpoint{Addresses: []string{""}}) fp.OnEndpointSliceUpdate(eps3update, eps3update2) fp.syncProxyRules() expected = baseRules + dedent.Dedent(` add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy nodeport-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 } add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 2 vmap { 0 : goto endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 , 1 : goto endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 } add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 meta l4proto tcp dnat to add chain ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 add rule ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 meta l4proto tcp dnat to add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 } add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) // Empty a service's endpoints; its chains will be flushed, but not immediately deleted. eps3update3 := eps3update2.DeepCopy() eps3update3.Endpoints = []discovery.Endpoint{} fp.OnEndpointSliceUpdate(eps3update2, eps3update3) fp.syncProxyRules() expected = baseRules + dedent.Dedent(` add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy nodeport-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } add element ip kube-proxy no-endpoint-services { . tcp . 80 comment "ns3/svc3:p80" : goto reject-chain } add element ip kube-proxy service-ips { . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 } add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 add chain ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 } add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) expectedStaleChains := sets.NewString("service-4AT6LBPK-ns3/svc3/tcp/p80", "endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80", "endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80") gotStaleChains := sets.StringKeySet(fp.staleChains) if !expectedStaleChains.Equal(gotStaleChains) { t.Errorf("expected stale chains %v, got %v", expectedStaleChains, gotStaleChains) } // Restore endpoints to non-empty immediately; its chains will be restored, and deleted from staleChains. fp.OnEndpointSliceUpdate(eps3update3, eps3update2) fp.syncProxyRules() expected = baseRules + dedent.Dedent(` add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy cluster-ips { } add element ip kube-proxy nodeport-ips { } add element ip kube-proxy service-ips { . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 } add element ip kube-proxy service-ips { . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 } add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 2 vmap { 0 : goto endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 , 1 : goto endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 } add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 meta l4proto tcp dnat to add chain ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 add rule ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 meta l4proto tcp dnat to add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr tcp dport 80 ip saddr != jump mark-for-masquerade add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 } add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 ip saddr jump mark-for-masquerade add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) if len(fp.staleChains) != 0 { t.Errorf("unexpected stale chains: %v", fp.staleChains) } // Empty a service's endpoints and restore it after stale chains age. // - its chains will be flushed, but not immediately deleted in the first sync. // - its chains will be deleted first, then recreated in the second sync. fp.OnEndpointSliceUpdate(eps3update2, eps3update3) fp.syncProxyRules() ageStaleChains() fp.OnEndpointSliceUpdate(eps3update3, eps3update2) fp.syncProxyRules() // The second change counteracts the first one, so same expected rules as last time assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) // Sync with no new changes, so same expected rules as last time fp.syncProxyRules() assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) } func TestNoEndpointsMetric(t *testing.T) { type endpoint struct { ip string hostname string } metrics.RegisterMetrics() testCases := []struct { name string internalTrafficPolicy *v1.ServiceInternalTrafficPolicy externalTrafficPolicy v1.ServiceExternalTrafficPolicy endpoints []endpoint expectedSyncProxyRulesNoLocalEndpointsTotalInternal int expectedSyncProxyRulesNoLocalEndpointsTotalExternal int }{ { name: "internalTrafficPolicy is set and there are local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), endpoints: []endpoint{ {"", testHostname}, {"", "host1"}, {"", "host2"}, }, }, { name: "externalTrafficPolicy is set and there are local endpoints", externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, endpoints: []endpoint{ {"", testHostname}, {"", "host1"}, {"", "host2"}, }, }, { name: "both policies are set and there are local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, endpoints: []endpoint{ {"", testHostname}, {"", "host1"}, {"", "host2"}, }, }, { name: "internalTrafficPolicy is set and there are no local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), endpoints: []endpoint{ {"", "host0"}, {"", "host1"}, {"", "host2"}, }, expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, }, { name: "externalTrafficPolicy is set and there are no local endpoints", externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, endpoints: []endpoint{ {"", "host0"}, {"", "host1"}, {"", "host2"}, }, expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, }, { name: "both policies are set and there are no local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, endpoints: []endpoint{ {"", "host0"}, {"", "host1"}, {"", "host2"}, }, expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, }, { name: "both policies are set and there are no endpoints at all", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, endpoints: []endpoint{}, expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 0, expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 0, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { _, fp := NewFakeProxier(v1.IPv4Protocol) fp.OnServiceSynced() fp.OnEndpointSlicesSynced() serviceName := "svc1" namespaceName := "ns1" svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "", Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Name: "", Port: 80, Protocol: v1.ProtocolTCP, NodePort: 123}}, }, } if tc.internalTrafficPolicy != nil { svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy } if tc.externalTrafficPolicy != "" { svc.Spec.Type = v1.ServiceTypeNodePort svc.Spec.ExternalTrafficPolicy = tc.externalTrafficPolicy } fp.OnServiceAdd(svc) endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, } for _, ep := range tc.endpoints { endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{ Addresses: []string{ep.ip}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To(ep.hostname), }) } fp.OnEndpointSliceAdd(endpointSlice) fp.syncProxyRules() syncProxyRulesNoLocalEndpointsTotalInternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal")) if err != nil { t.Errorf("failed to get %s value, err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) } if tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal != int(syncProxyRulesNoLocalEndpointsTotalInternal) { t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalInternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal) } syncProxyRulesNoLocalEndpointsTotalExternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external")) if err != nil { t.Errorf("failed to get %s value(external), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) } if tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal != int(syncProxyRulesNoLocalEndpointsTotalExternal) { t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal) } }) } } func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) { testCases := []struct { name string ipModeEnabled bool svcIP string svcLBIP string ipMode *v1.LoadBalancerIPMode expectedRule bool }{ /* LoadBalancerIPMode disabled */ { name: "LoadBalancerIPMode disabled, ipMode Proxy", ipModeEnabled: false, svcIP: "", svcLBIP: "", ipMode: ptr.To(v1.LoadBalancerIPModeProxy), expectedRule: true, }, { name: "LoadBalancerIPMode disabled, ipMode VIP", ipModeEnabled: false, svcIP: "", svcLBIP: "", ipMode: ptr.To(v1.LoadBalancerIPModeVIP), expectedRule: true, }, { name: "LoadBalancerIPMode disabled, ipMode nil", ipModeEnabled: false, svcIP: "", svcLBIP: "", ipMode: nil, expectedRule: true, }, /* LoadBalancerIPMode enabled */ { name: "LoadBalancerIPMode enabled, ipMode Proxy", ipModeEnabled: true, svcIP: "", svcLBIP: "", ipMode: ptr.To(v1.LoadBalancerIPModeProxy), expectedRule: false, }, { name: "LoadBalancerIPMode enabled, ipMode VIP", ipModeEnabled: true, svcIP: "", svcLBIP: "", ipMode: ptr.To(v1.LoadBalancerIPModeVIP), expectedRule: true, }, { name: "LoadBalancerIPMode enabled, ipMode nil", ipModeEnabled: true, svcIP: "", svcLBIP: "", ipMode: nil, expectedRule: true, }, } svcPort := 80 svcNodePort := 3001 svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", Protocol: v1.ProtocolTCP, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, testCase.ipModeEnabled)() nft, fp := NewFakeProxier(v1.IPv4Protocol) makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = testCase.svcIP svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, NodePort: int32(svcNodePort), }} svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: testCase.svcLBIP, IPMode: testCase.ipMode, }} }), ) populateEndpointSlices(fp, makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{""}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() element := nft.Table.Maps["service-ips"].FindElement(testCase.svcLBIP, "tcp", "80") ruleExists := element != nil if ruleExists != testCase.expectedRule { t.Errorf("unexpected rule for %s", testCase.svcLBIP) } }) } } func Test_servicePortChainNameBase(t *testing.T) { testCases := []struct { name string spn proxy.ServicePortName protocol string expected string }{ { name: "simple", spn: proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: "testing", Name: "service", }, Port: "http", }, protocol: "tcp", expected: "P4ZYZVCF-testing/service/tcp/http", }, { name: "different port, different hash", spn: proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: "testing", Name: "service", }, Port: "https", }, protocol: "tcp", expected: "LZBRENCP-testing/service/tcp/https", }, { name: "max length", spn: proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx", Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really", }, Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls", }, protocol: "sctp", expected: "KR6NACJP-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls", }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { name := servicePortChainNameBase(&tc.spn, tc.protocol) if name != tc.expected { t.Errorf("expected %q, got %q", tc.expected, name) } }) } } func Test_servicePortEndpointChainNameBase(t *testing.T) { testCases := []struct { name string spn proxy.ServicePortName protocol string endpoint string expected string }{ { name: "simple", spn: proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: "testing", Name: "service", }, Port: "http", }, protocol: "tcp", endpoint: "", expected: "JO2XBXZR-testing/service/tcp/http__10.180.0.1/80", }, { name: "different endpoint, different hash", spn: proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: "testing", Name: "service", }, Port: "http", }, protocol: "tcp", endpoint: "", expected: "5S6H3H22-testing/service/tcp/http__10.180.0.2/80", }, { name: "ipv6", spn: proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: "testing", Name: "service", }, Port: "http", }, protocol: "tcp", endpoint: "[fd80:abcd:12::a1b2:c3d4:e5f6:9999]:80", expected: "U7E2ET36-testing/service/tcp/http__fd80.abcd.12..a1b2.c3d4.e5f6.9999/80", }, { name: "max length without truncation", spn: proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx", Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really", }, Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls", }, protocol: "sctp", endpoint: "[1234:5678:9abc:def0::abc:1234]:443", expected: "5YS7AFEA-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls__1234.5678.9abc.def0..abc.1234/443", }, { name: "truncated, 1", spn: proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx", Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really", }, Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls", }, protocol: "sctp", endpoint: "[1234:5678:9abc:def0::abcd:1234:5678]:443", expected: "CI6C53Q3-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls__1234.5678.9abc.def0..abcd.1234...", }, { name: "truncated, 2 (different IP, which is not visible in the result)", spn: proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx", Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really", }, Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls", }, protocol: "sctp", endpoint: "[1234:5678:9abc:def0::abcd:1234:8765]:443", expected: "2FLXFK6X-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls__1234.5678.9abc.def0..abcd.1234...", }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { name := servicePortEndpointChainNameBase(&tc.spn, tc.protocol, tc.endpoint) if name != tc.expected { t.Errorf("expected %q, got %q", tc.expected, name) } }) } } func TestProxier_OnServiceCIDRsChanged(t *testing.T) { var proxier *Proxier proxier = &Proxier{ipFamily: v1.IPv4Protocol} proxier.OnServiceCIDRsChanged([]string{"", "fd00:10:96::/112"}) assert.Equal(t, proxier.serviceCIDRs, "") proxier.OnServiceCIDRsChanged([]string{"", "", "fd00:10:96::/112", "fd00:172:30::/112"}) assert.Equal(t, proxier.serviceCIDRs, ",") proxier = &Proxier{ipFamily: v1.IPv6Protocol} proxier.OnServiceCIDRsChanged([]string{"", "fd00:10:96::/112"}) assert.Equal(t, proxier.serviceCIDRs, "fd00:10:96::/112") proxier.OnServiceCIDRsChanged([]string{"", "", "fd00:10:96::/112", "fd00:172:30::/112"}) assert.Equal(t, proxier.serviceCIDRs, "fd00:10:96::/112,fd00:172:30::/112") }