//go:build linux // +build linux /* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package ipvs import ( "bytes" "fmt" "net" "reflect" "sort" "strings" "testing" "time" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/conntrack" "k8s.io/kubernetes/pkg/proxy/healthcheck" utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset" ipsettest "k8s.io/kubernetes/pkg/proxy/ipvs/ipset/testing" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" utilipvs "k8s.io/kubernetes/pkg/proxy/ipvs/util" ipvstest "k8s.io/kubernetes/pkg/proxy/ipvs/util/testing" "k8s.io/kubernetes/pkg/proxy/metrics" proxyutil "k8s.io/kubernetes/pkg/proxy/util" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" "k8s.io/kubernetes/pkg/util/async" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" netutils "k8s.io/utils/net" "k8s.io/utils/ptr" ) const testHostname = "test-hostname" // fakeIpvs implements utilipvs.Interface type fakeIpvs struct { ipvsErr string vsCreated bool } func (f *fakeIpvs) Flush() error { return nil } func (f *fakeIpvs) AddVirtualServer(*utilipvs.VirtualServer) error { if f.ipvsErr == "AddVirtualServer" { return fmt.Errorf("oops") } f.vsCreated = true return nil } func (f *fakeIpvs) UpdateVirtualServer(*utilipvs.VirtualServer) error { return nil } func (f *fakeIpvs) DeleteVirtualServer(*utilipvs.VirtualServer) error { if f.ipvsErr == "DeleteVirtualServer" { return fmt.Errorf("oops") } return nil } func (f *fakeIpvs) GetVirtualServer(*utilipvs.VirtualServer) (*utilipvs.VirtualServer, error) { return nil, nil } func (f *fakeIpvs) GetVirtualServers() ([]*utilipvs.VirtualServer, error) { if f.ipvsErr == "GetVirtualServers" { return nil, fmt.Errorf("oops") } if f.vsCreated { vs := []*utilipvs.VirtualServer{{}} return vs, nil } return nil, nil } func (f *fakeIpvs) AddRealServer(*utilipvs.VirtualServer, *utilipvs.RealServer) error { return nil } func (f *fakeIpvs) GetRealServers(*utilipvs.VirtualServer) ([]*utilipvs.RealServer, error) { return nil, nil } func (f *fakeIpvs) DeleteRealServer(*utilipvs.VirtualServer, *utilipvs.RealServer) error { return nil } func (f *fakeIpvs) UpdateRealServer(*utilipvs.VirtualServer, *utilipvs.RealServer) error { return nil } func (f *fakeIpvs) ConfigureTimeouts(time.Duration, time.Duration, time.Duration) error { return nil } // fakeIPSetVersioner implements IPSetVersioner. type fakeIPSetVersioner struct { version string err error } func (fake *fakeIPSetVersioner) GetVersion() (string, error) { return fake.version, fake.err } func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []string, excludeCIDRs []*net.IPNet, ipFamily v1.IPFamily) *Proxier { netlinkHandle := netlinktest.NewFakeNetlinkHandle(ipFamily == v1.IPv6Protocol) netlinkHandle.SetLocalAddresses("eth0", nodeIPs...) // initialize ipsetList with all sets we needed ipsetList := make(map[string]*IPSet) for _, is := range ipsetInfo { ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment) } p := &Proxier{ svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, nil, ipFamily, nil, nil), excludeCIDRs: excludeCIDRs, iptables: ipt, ipvs: ipvs, ipset: ipset, conntrack: conntrack.NewFake(), strictARP: false, localDetector: proxyutiliptables.NewNoOpLocalDetector(), hostname: testHostname, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), ipvsScheduler: defaultScheduler, iptablesData: bytes.NewBuffer(nil), filterChainsData: bytes.NewBuffer(nil), natChains: proxyutil.NewLineBuffer(), natRules: proxyutil.NewLineBuffer(), filterChains: proxyutil.NewLineBuffer(), filterRules: proxyutil.NewLineBuffer(), netlinkHandle: netlinkHandle, ipsetList: ipsetList, nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil, nil), networkInterfacer: proxyutiltest.NewFakeNetwork(), gracefuldeleteManager: NewGracefulTerminationManager(ipvs), ipFamily: ipFamily, } p.setInitialized(true) p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) return p } func makeNSN(namespace, name string) types.NamespacedName { return types.NamespacedName{Namespace: namespace, Name: name} } func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) { for i := range allServices { proxier.OnServiceAdd(allServices[i]) } proxier.mu.Lock() defer proxier.mu.Unlock() proxier.servicesSynced = true } func makeEndpointSliceMap(proxier *Proxier, allEpSlices ...*discovery.EndpointSlice) { for i := range allEpSlices { proxier.OnEndpointSliceAdd(allEpSlices[i]) } proxier.mu.Lock() defer proxier.mu.Unlock() proxier.endpointSlicesSynced = true } func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service { svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, Annotations: map[string]string{}, }, Spec: v1.ServiceSpec{}, Status: v1.ServiceStatus{}, } svcFunc(svc) return svc } func populateEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.EndpointSlice) { for i := range allEndpointSlices { proxier.OnEndpointSliceAdd(allEndpointSlices[i]) } } func makeTestEndpointSlice(namespace, name string, sliceNum int, epsFunc func(*discovery.EndpointSlice)) *discovery.EndpointSlice { eps := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%d", name, sliceNum), Namespace: namespace, Labels: map[string]string{discovery.LabelServiceName: name}, }, } epsFunc(eps) return eps } func TestCleanupLeftovers(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, NodePort: int32(svcNodePort), }} }), ) epIP := "10.180.0.1" populateEndpointSlices(fp, makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To(svcPortName.Port), Port: ptr.To(int32(svcPort)), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() // test cleanup left over if CleanupLeftovers(ipvs, ipt, ipset) { t.Errorf("Cleanup leftovers failed") } } func TestCanUseIPVSProxier(t *testing.T) { testCases := []struct { name string scheduler string ipsetVersion string ipsetErr error ipvsErr string ok bool }{ { name: "happy days", ipsetVersion: MinIPSetCheckVersion, ok: true, }, { name: "ipset error", scheduler: "", ipsetVersion: MinIPSetCheckVersion, ipsetErr: fmt.Errorf("oops"), ok: false, }, { name: "ipset version too low", scheduler: "rr", ipsetVersion: "4.3.0", ok: false, }, { name: "GetVirtualServers fail", ipsetVersion: MinIPSetCheckVersion, ipvsErr: "GetVirtualServers", ok: false, }, { name: "AddVirtualServer fail", ipsetVersion: MinIPSetCheckVersion, ipvsErr: "AddVirtualServer", ok: false, }, { name: "DeleteVirtualServer fail", ipsetVersion: MinIPSetCheckVersion, ipvsErr: "DeleteVirtualServer", ok: false, }, } for _, tc := range testCases { ipvs := &fakeIpvs{tc.ipvsErr, false} versioner := &fakeIPSetVersioner{version: tc.ipsetVersion, err: tc.ipsetErr} err := CanUseIPVSProxier(ipvs, versioner, tc.scheduler) if (err == nil) != tc.ok { t.Errorf("Case [%s], expect %v, got err: %v", tc.name, tc.ok, err) } } } func TestGetNodeIPs(t *testing.T) { testCases := []struct { isIPv6 bool devAddresses map[string][]string expectIPs []string }{ // case 0 { devAddresses: map[string][]string{"eth0": {"1.2.3.4"}, "lo": {"127.0.0.1"}}, expectIPs: []string{"1.2.3.4"}, }, // case 1 { devAddresses: map[string][]string{"lo": {"127.0.0.1"}}, expectIPs: []string{}, }, // case 2 { devAddresses: map[string][]string{}, expectIPs: []string{}, }, // case 3 { devAddresses: map[string][]string{"encap0": {"10.20.30.40", "fe80::200:ff:fe01:1"}, "lo": {"127.0.0.1", "::1"}, "docker0": {"172.17.0.1"}}, expectIPs: []string{"10.20.30.40", "172.17.0.1"}, }, // case 4 { devAddresses: map[string][]string{"encaps9": {"10.20.30.40"}, "lo": {"127.0.0.1", "::1"}, "encap7": {"1000::", "10.20.30.31"}}, expectIPs: []string{"10.20.30.40", "10.20.30.31"}, }, // case 5 { devAddresses: map[string][]string{"kube-ipvs0": {"2000::", "1.2.3.4"}, "lo": {"127.0.0.1", "::1"}, "encap7": {"1000::", "10.20.30.31"}}, expectIPs: []string{"10.20.30.31"}, }, // case 6 { devAddresses: map[string][]string{"kube-ipvs0": {"1.2.3.4", "2.3.4.5"}, "lo": {"127.0.0.1", "::1"}}, expectIPs: []string{}, }, // case 7 { devAddresses: map[string][]string{"kube-ipvs0": {"1.2.3.4", "2.3.4.5"}}, expectIPs: []string{}, }, // case 8 { devAddresses: map[string][]string{"kube-ipvs0": {"1.2.3.4", "2.3.4.5"}, "eth5": {"3.4.5.6"}, "lo": {"127.0.0.1", "::1"}}, expectIPs: []string{"3.4.5.6"}, }, // case 9 { devAddresses: map[string][]string{"ipvs0": {"1.2.3.4"}, "lo": {"127.0.0.1", "::1"}, "encap7": {"10.20.30.31"}}, expectIPs: []string{"10.20.30.31", "1.2.3.4"}, }, // case 10 { isIPv6: true, devAddresses: map[string][]string{"ipvs0": {"1.2.3.4", "1000::"}, "lo": {"127.0.0.1", "::1"}, "encap7": {"10.20.30.31", "2000::", "fe80::200:ff:fe01:1"}}, expectIPs: []string{"1000::", "2000::"}, }, // case 11 { isIPv6: true, devAddresses: map[string][]string{"ipvs0": {"1.2.3.4", "1000::"}, "lo": {"127.0.0.1", "::1"}, "encap7": {"10.20.30.31", "2000::", "fe80::200:ff:fe01:1"}, "kube-ipvs0": {"1.2.3.4", "2.3.4.5", "2000::"}}, expectIPs: []string{"1000::"}, }, } for i, tc := range testCases { fake := netlinktest.NewFakeNetlinkHandle(tc.isIPv6) for dev, addresses := range testCases[i].devAddresses { fake.SetLocalAddresses(dev, addresses...) } ips, err := fake.GetAllLocalAddresses() if err != nil { t.Errorf("Unexpected error: %v", err) } devIps, err := fake.GetLocalAddresses("kube-ipvs0") if err != nil { t.Errorf("Unexpected error: %v", err) } ips = ips.Difference(devIps) if !ips.Equal(sets.New(tc.expectIPs...)) { t.Errorf("case[%d], unexpected mismatch, expected: %v, got: %v", i, tc.expectIPs, ips) } } } func TestNodePortIPv4(t *testing.T) { tests := []struct { name string services []*v1.Service endpoints []*discovery.EndpointSlice nodeIPs []string nodePortAddresses []string expectedIPVS *ipvstest.FakeIPVS expectedIPSets netlinktest.ExpectedIPSet expectedIptablesChains netlinktest.ExpectedIptablesChain }{ { name: "1 service with node port, has 2 endpoints", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = "10.20.30.41" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolTCP, NodePort: int32(3001), }} }), }, endpoints: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"10.180.0.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), makeTestEndpointSlice("ns1", "svc1", 2, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv6 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1002:ab8::2:10"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), }, nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "10.20.30.41", Port: 80, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("10.20.30.41"), Protocol: "TCP", Port: uint16(80), Scheduler: "rr", }, { IP: "100.101.102.103", Port: 3001, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("100.101.102.103"), Protocol: "TCP", Port: uint16(3001), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "10.20.30.41", Port: 80, Protocol: "TCP", }: { { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(80), Weight: 1, }, }, { IP: "100.101.102.103", Port: 3001, Protocol: "TCP", }: { { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(80), Weight: 1, }, }, }, }, }, { name: "1 UDP service with node port, has endpoints", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = "10.20.30.41" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolUDP, NodePort: int32(3001), }} }), }, endpoints: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"10.180.0.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolUDP), }} }), }, nodeIPs: []string{"100.101.102.103"}, nodePortAddresses: []string{"0.0.0.0/0"}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "10.20.30.41", Port: 80, Protocol: "UDP", }: { Address: netutils.ParseIPSloppy("10.20.30.41"), Protocol: "UDP", Port: uint16(80), Scheduler: "rr", }, { IP: "100.101.102.103", Port: 3001, Protocol: "UDP", }: { Address: netutils.ParseIPSloppy("100.101.102.103"), Protocol: "UDP", Port: uint16(3001), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "10.20.30.41", Port: 80, Protocol: "UDP", }: { { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(80), Weight: 1, }, }, { IP: "100.101.102.103", Port: 3001, Protocol: "UDP", }: { { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(80), Weight: 1, }, }, }, }, expectedIPSets: netlinktest.ExpectedIPSet{ kubeNodePortSetUDP: {{ Port: 3001, Protocol: strings.ToLower(string(v1.ProtocolUDP)), SetType: utilipset.BitmapPort, }}, }, expectedIptablesChains: netlinktest.ExpectedIptablesChain{ string(kubeNodePortChain): {{ JumpChain: string(kubeMarkMasqChain), MatchSet: kubeNodePortSetUDP, }, { JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, }}, string(kubeServicesChain): {{ JumpChain: "RETURN", SourceAddress: "127.0.0.0/8", }, { JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet, }, { JumpChain: string(kubeNodePortChain), MatchSet: "", }, { JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, }}, }, }, { name: "service has node port but no endpoints", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = "10.20.30.41" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolTCP, NodePort: int32(3001), }} }), }, endpoints: []*discovery.EndpointSlice{}, nodeIPs: []string{"100.101.102.103"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "10.20.30.41", Port: 80, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("10.20.30.41"), Protocol: "TCP", Port: uint16(80), Scheduler: "rr", }, { IP: "100.101.102.103", Port: 3001, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("100.101.102.103"), Protocol: "TCP", Port: uint16(3001), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "10.20.30.41", Port: 80, Protocol: "TCP", }: {}, // no real servers corresponding to no endpoints { IP: "100.101.102.103", Port: 3001, Protocol: "TCP", }: {}, // no real servers corresponding to no endpoints }, }, }, { name: "node port service with protocol sctp on a node with multiple nodeIPs", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = "10.20.30.41" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolSCTP, NodePort: int32(3001), }} }), }, endpoints: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"10.180.0.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolSCTP), }} }), }, nodeIPs: []string{ "100.101.102.103", "100.101.102.104", "100.101.102.105", "2001:db8::1:1", "2001:db8::1:2", "2001:db8::1:3", }, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "10.20.30.41", Port: 80, Protocol: "SCTP", }: { Address: netutils.ParseIPSloppy("10.20.30.41"), Protocol: "SCTP", Port: uint16(80), Scheduler: "rr", }, { IP: "100.101.102.103", Port: 3001, Protocol: "SCTP", }: { Address: netutils.ParseIPSloppy("100.101.102.103"), Protocol: "SCTP", Port: uint16(3001), Scheduler: "rr", }, { IP: "100.101.102.104", Port: 3001, Protocol: "SCTP", }: { Address: netutils.ParseIPSloppy("100.101.102.104"), Protocol: "SCTP", Port: uint16(3001), Scheduler: "rr", }, { IP: "100.101.102.105", Port: 3001, Protocol: "SCTP", }: { Address: netutils.ParseIPSloppy("100.101.102.105"), Protocol: "SCTP", Port: uint16(3001), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "10.20.30.41", Port: 80, Protocol: "SCTP", }: { { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(80), Weight: 1, }, }, { IP: "100.101.102.103", Port: 3001, Protocol: "SCTP", }: { { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(80), Weight: 1, }, }, { IP: "100.101.102.104", Port: 3001, Protocol: "SCTP", }: { { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(80), Weight: 1, }, }, { IP: "100.101.102.105", Port: 3001, Protocol: "SCTP", }: { { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(80), Weight: 1, }, }, }, }, expectedIPSets: netlinktest.ExpectedIPSet{ kubeNodePortSetSCTP: { { IP: "100.101.102.103", Port: 3001, Protocol: strings.ToLower(string(v1.ProtocolSCTP)), SetType: utilipset.HashIPPort, }, { IP: "100.101.102.104", Port: 3001, Protocol: strings.ToLower(string(v1.ProtocolSCTP)), SetType: utilipset.HashIPPort, }, { IP: "100.101.102.105", Port: 3001, Protocol: strings.ToLower(string(v1.ProtocolSCTP)), SetType: utilipset.HashIPPort, }, }, }, }, { name: "node port service with protocol sctp and externalTrafficPolicy local", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = "10.20.30.41" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolSCTP, NodePort: int32(3001), }} svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal }), }, endpoints: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"10.180.0.1"}, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.180.1.1"}, NodeName: ptr.To("otherHost"), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolSCTP), }} }), }, nodeIPs: []string{"100.101.102.103"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "10.20.30.41", Port: 80, Protocol: "SCTP", }: { Address: netutils.ParseIPSloppy("10.20.30.41"), Protocol: "SCTP", Port: uint16(80), Scheduler: "rr", }, { IP: "100.101.102.103", Port: 3001, Protocol: "SCTP", }: { Address: netutils.ParseIPSloppy("100.101.102.103"), Protocol: "SCTP", Port: uint16(3001), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "10.20.30.41", Port: 80, Protocol: "SCTP", }: { { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(80), Weight: 1, }, { Address: netutils.ParseIPSloppy("10.180.1.1"), Port: uint16(80), Weight: 1, }, }, { IP: "100.101.102.103", Port: 3001, Protocol: "SCTP", }: { { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(80), Weight: 1, }, }, }, }, expectedIPSets: netlinktest.ExpectedIPSet{ kubeNodePortSetSCTP: { { IP: "100.101.102.103", Port: 3001, Protocol: strings.ToLower(string(v1.ProtocolSCTP)), SetType: utilipset.HashIPPort, }, }, kubeNodePortLocalSetSCTP: { { IP: "100.101.102.103", Port: 3001, Protocol: strings.ToLower(string(v1.ProtocolSCTP)), SetType: utilipset.HashIPPort, }, }, }, expectedIptablesChains: netlinktest.ExpectedIptablesChain{ string(kubeNodePortChain): {{ JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetSCTP, }, { JumpChain: string(kubeMarkMasqChain), MatchSet: kubeNodePortSetSCTP, }, { JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, }}, }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv4Protocol) fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, test.nodePortAddresses, nil) makeServiceMap(fp, test.services...) populateEndpointSlices(fp, test.endpoints...) fp.syncProxyRules() if !reflect.DeepEqual(ipvs, test.expectedIPVS) { t.Logf("actual ipvs state: %+v", ipvs) t.Logf("expected ipvs state: %+v", test.expectedIPVS) t.Errorf("unexpected IPVS state") } if test.expectedIPSets != nil { checkIPSet(t, fp, test.expectedIPSets) } if test.expectedIptablesChains != nil { checkIptables(t, ipt, test.expectedIptablesChains) } }) } } func TestNodePortIPv6(t *testing.T) { tests := []struct { name string services []*v1.Service endpoints []*discovery.EndpointSlice nodeIPs []string nodePortAddresses []string expectedIPVS *ipvstest.FakeIPVS expectedIPSets netlinktest.ExpectedIPSet expectedIptablesChains netlinktest.ExpectedIptablesChain }{ { name: "1 service with node port, has 2 endpoints", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = "2020::1" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolTCP, NodePort: int32(3001), }} }), }, endpoints: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"10.180.0.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), makeTestEndpointSlice("ns1", "svc1", 2, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv6 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1002:ab8::2:10"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), }, nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "2001:db8::1:1", Port: 3001, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("2001:db8::1:1"), Protocol: "TCP", Port: uint16(3001), Scheduler: "rr", }, { IP: "2020::1", Port: 80, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("2020::1"), Protocol: "TCP", Port: uint16(80), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "2001:db8::1:1", Port: 3001, Protocol: "TCP", }: { { Address: netutils.ParseIPSloppy("1002:ab8::2:10"), Port: uint16(80), Weight: 1, }, }, { IP: "2020::1", Port: 80, Protocol: "TCP", }: { { Address: netutils.ParseIPSloppy("1002:ab8::2:10"), Port: uint16(80), Weight: 1, }, }, }, }, }, { name: "1 UDP service with node port, has endpoints (no action on IPv6 Proxier)", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = "10.20.30.41" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolUDP, NodePort: int32(3001), }} }), }, endpoints: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv6 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"10.180.0.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolUDP), }} }), }, nodeIPs: []string{"100.101.102.103"}, nodePortAddresses: []string{"0.0.0.0/0"}, /*since this is a node with only IPv4, proxier should not do anything */ expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{}, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{}, }, expectedIPSets: nil, expectedIptablesChains: nil, }, { name: "service has node port but no endpoints", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = "2020::1" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolTCP, NodePort: int32(3001), }} }), }, endpoints: []*discovery.EndpointSlice{}, nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "2001:db8::1:1", Port: 3001, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("2001:db8::1:1"), Protocol: "TCP", Port: uint16(3001), Scheduler: "rr", }, { IP: "2020::1", Port: 80, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("2020::1"), Protocol: "TCP", Port: uint16(80), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "2020::1", Port: 80, Protocol: "TCP", }: {}, // no real servers corresponding to no endpoints { IP: "2001:db8::1:1", Port: 3001, Protocol: "TCP", }: {}, // no real servers corresponding to no endpoints }, }, }, { name: "node port service with protocol sctp on a node with multiple nodeIPs", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = "2020::1" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolSCTP, NodePort: int32(3001), }} }), }, endpoints: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv6 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"2001::1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolSCTP), }} }), }, nodeIPs: []string{"2001:db8::1:1", "2001:db8::1:2"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "2001:db8::1:1", Port: 3001, Protocol: "SCTP", }: { Address: netutils.ParseIPSloppy("2001:db8::1:1"), Protocol: "SCTP", Port: uint16(3001), Scheduler: "rr", }, { IP: "2001:db8::1:2", Port: 3001, Protocol: "SCTP", }: { Address: netutils.ParseIPSloppy("2001:db8::1:2"), Protocol: "SCTP", Port: uint16(3001), Scheduler: "rr", }, { IP: "2020::1", Port: 80, Protocol: "SCTP", }: { Address: netutils.ParseIPSloppy("2020::1"), Protocol: "SCTP", Port: uint16(80), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "2001:db8::1:1", Port: 3001, Protocol: "SCTP", }: { { Address: netutils.ParseIPSloppy("2001::1"), Port: uint16(80), Weight: 1, }, }, { IP: "2001:db8::1:2", Port: 3001, Protocol: "SCTP", }: { { Address: netutils.ParseIPSloppy("2001::1"), Port: uint16(80), Weight: 1, }, }, { IP: "2020::1", Port: 80, Protocol: "SCTP", }: { { Address: netutils.ParseIPSloppy("2001::1"), Port: uint16(80), Weight: 1, }, }, }, }, expectedIPSets: netlinktest.ExpectedIPSet{ kubeNodePortSetSCTP: { { IP: "2001:db8::1:1", Port: 3001, Protocol: strings.ToLower(string(v1.ProtocolSCTP)), SetType: utilipset.HashIPPort, }, { IP: "2001:db8::1:2", Port: 3001, Protocol: strings.ToLower(string(v1.ProtocolSCTP)), SetType: utilipset.HashIPPort, }, }, }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv6Protocol) fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv6Protocol, test.nodePortAddresses, nil) makeServiceMap(fp, test.services...) populateEndpointSlices(fp, test.endpoints...) fp.syncProxyRules() if !reflect.DeepEqual(ipvs, test.expectedIPVS) { t.Logf("actual ipvs state: %+v", ipvs) t.Logf("expected ipvs state: %+v", test.expectedIPVS) t.Errorf("unexpected IPVS state") } if test.expectedIPSets != nil { checkIPSet(t, fp, test.expectedIPSets) } if test.expectedIptablesChains != nil { checkIptables(t, ipt, test.expectedIptablesChains) } }) } } func Test_syncEndpoint_updateWeightsOnRestart(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) svc1 := makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.ClusterIP = "10.20.30.41" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolTCP, }} }) epSlice1 := makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"10.180.0.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }) // sync proxy rules to get to the desired initial state makeServiceMap(fp, svc1) makeEndpointSliceMap(fp, epSlice1) fp.syncProxyRules() serv := &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("10.20.30.41"), Port: uint16(80), Protocol: string(v1.ProtocolTCP), Scheduler: fp.ipvsScheduler, } vs, err := fp.ipvs.GetVirtualServer(serv) if err != nil { t.Errorf("failed to get virtual server, err: %v", err) } rss, err := fp.ipvs.GetRealServers(vs) if err != nil { t.Errorf("failed to get real servers, err: %v", err) } for _, rs := range rss { rs.Weight = 0 if err = fp.ipvs.UpdateRealServer(vs, rs); err != nil { t.Errorf("failed to update real server: %v, err: %v", rs, err) } } // simulate a restart by enabling initial sync logic. fp.initialSync = true err = fp.syncEndpoint(proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Name: "svc1", Namespace: "ns1", }, Port: "80", Protocol: v1.ProtocolTCP, }, true, vs) if err != nil { t.Errorf("failed to sync endpoint, err: %v", err) } rss, err = fp.ipvs.GetRealServers(vs) if err != nil { t.Errorf("failed to get real server, err: %v", err) } for _, rs := range rss { if rs.Weight != 1 { t.Logf("unexpected realserver weight: %d, expected weight: 1", rs.Weight) t.Errorf("unexpected realserver state") } } } func TestIPv4Proxier(t *testing.T) { tests := []struct { name string services []*v1.Service endpoints []*discovery.EndpointSlice expectedIPVS *ipvstest.FakeIPVS }{ { name: "2 services with Cluster IP, each with endpoints", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.ClusterIP = "10.20.30.41" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolTCP, }} }), makeTestService("ns2", "svc2", func(svc *v1.Service) { svc.Spec.ClusterIP = "1002:ab8::2:1" svc.Spec.Ports = []v1.ServicePort{{ Name: "p8080", Port: int32(8080), Protocol: v1.ProtocolTCP, }} }), }, endpoints: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"10.180.0.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv6 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1009:ab8::5:6"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p8080"), Port: ptr.To[int32](8080), Protocol: ptr.To(v1.ProtocolTCP), }} }), }, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "10.20.30.41", Port: 80, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("10.20.30.41"), Protocol: "TCP", Port: uint16(80), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "10.20.30.41", Port: 80, Protocol: "TCP", }: { { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(80), Weight: 1, }, }, }, }, }, { name: "cluster IP service with no endpoints", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.ClusterIP = "10.20.30.41" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolTCP, }} }), }, endpoints: []*discovery.EndpointSlice{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "10.20.30.41", Port: 80, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("10.20.30.41"), Protocol: "TCP", Port: uint16(80), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "10.20.30.41", Port: 80, Protocol: "TCP", }: {}, }, }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) makeServiceMap(fp, test.services...) populateEndpointSlices(fp, test.endpoints...) fp.syncProxyRules() if !reflect.DeepEqual(ipvs, test.expectedIPVS) { t.Logf("actual ipvs state: %v", ipvs) t.Logf("expected ipvs state: %v", test.expectedIPVS) t.Errorf("unexpected IPVS state") } }) } } func TestIPv6Proxier(t *testing.T) { tests := []struct { name string services []*v1.Service endpoints []*discovery.EndpointSlice expectedIPVS *ipvstest.FakeIPVS }{ { name: "2 services with Cluster IP, each with endpoints", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.ClusterIP = "10.20.30.41" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolTCP, }} }), makeTestService("ns2", "svc2", func(svc *v1.Service) { svc.Spec.ClusterIP = "1002:ab8::2:1" svc.Spec.Ports = []v1.ServicePort{{ Name: "p8080", Port: int32(8080), Protocol: v1.ProtocolTCP, }} }), }, endpoints: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"10.180.0.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv6 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1009:ab8::5:6"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p8080"), Port: ptr.To[int32](8080), Protocol: ptr.To(v1.ProtocolTCP), }} }), }, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "1002:ab8::2:1", Port: 8080, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("1002:ab8::2:1"), Protocol: "TCP", Port: uint16(8080), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "1002:ab8::2:1", Port: 8080, Protocol: "TCP", }: { { Address: netutils.ParseIPSloppy("1009:ab8::5:6"), Port: uint16(8080), Weight: 1, }, }, }, }, }, { name: "cluster IP service with no endpoints", services: []*v1.Service{ makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.ClusterIP = "2001::1" svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: int32(80), Protocol: v1.ProtocolTCP, }} }), }, endpoints: []*discovery.EndpointSlice{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ { IP: "2001::1", Port: 80, Protocol: "TCP", }: { Address: netutils.ParseIPSloppy("2001::1"), Protocol: "TCP", Port: uint16(80), Scheduler: "rr", }, }, Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ { IP: "2001::1", Port: 80, Protocol: "TCP", }: {}, }, }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv6Protocol) makeServiceMap(fp, test.services...) populateEndpointSlices(fp, test.endpoints...) fp.syncProxyRules() if !reflect.DeepEqual(ipvs, test.expectedIPVS) { t.Logf("actual ipvs state: %v", ipvs) t.Logf("expected ipvs state: %v", test.expectedIPVS) t.Errorf("unexpected IPVS state") } }) } } func TestMasqueradeRule(t *testing.T) { for _, testcase := range []bool{false, true} { ipt := iptablestest.NewFake().SetHasRandomFully(testcase) ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) makeServiceMap(fp) fp.syncProxyRules() buf := bytes.NewBuffer(nil) _ = ipt.SaveInto(utiliptables.TableNAT, buf) natRules := strings.Split(buf.String(), "\n") var hasMasqueradeJump, hasMasqRandomFully bool for _, line := range natRules { rule, _ := iptablestest.ParseRule(line, false) if rule != nil && rule.Chain == kubePostroutingChain && rule.Jump != nil && rule.Jump.Value == "MASQUERADE" { hasMasqueradeJump = true if rule.RandomFully != nil { hasMasqRandomFully = true } break } } if !hasMasqueradeJump { t.Errorf("Failed to find -j MASQUERADE in %s chain", kubePostroutingChain) } if hasMasqRandomFully != testcase { probs := map[bool]string{false: "found", true: "did not find"} t.Errorf("%s --random-fully in -j MASQUERADE rule in %s chain for HasRandomFully()=%v", probs[testcase], kubePostroutingChain, testcase) } } } func TestExternalIPsNoEndpoint(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) svcIP := "10.20.30.41" svcPort := 80 svcExternalIPs := "50.60.70.81" svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "ClusterIP" svc.Spec.ClusterIP = svcIP svc.Spec.ExternalIPs = []string{svcExternalIPs} svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(int32(svcPort)), }} }), ) fp.syncProxyRules() // check ipvs service and destinations services, err := ipvs.GetVirtualServers() if err != nil { t.Errorf("Failed to get ipvs services, err: %v", err) } if len(services) != 2 { t.Errorf("Expect 2 ipvs services, got %d", len(services)) } found := false for _, svc := range services { if svc.Address.String() == svcExternalIPs && svc.Port == uint16(svcPort) && svc.Protocol == string(v1.ProtocolTCP) { found = true destinations, _ := ipvs.GetRealServers(svc) if len(destinations) != 0 { t.Errorf("Unexpected %d destinations, expect 0 destinations", len(destinations)) } break } } if !found { t.Errorf("Expect external ip type service, got none") } } func TestExternalIPs(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) svcIP := "10.20.30.41" svcPort := 80 svcExternalIPs := sets.New[string]("50.60.70.81", "2012::51", "127.0.0.1") svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "ClusterIP" svc.Spec.ClusterIP = svcIP svc.Spec.ExternalIPs = svcExternalIPs.UnsortedList() svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(int32(svcPort)), }} }), ) epIP := "10.180.0.1" populateEndpointSlices(fp, makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To(svcPortName.Port), Port: ptr.To(int32(svcPort)), Protocol: ptr.To(v1.ProtocolUDP), }} }), ) fp.syncProxyRules() // check ipvs service and destinations services, err := ipvs.GetVirtualServers() if err != nil { t.Errorf("Failed to get ipvs services, err: %v", err) } if len(services) != 3 { // ipvs filters out by ipfamily t.Errorf("Expect 3 ipvs services, got %d", len(services)) } found := false for _, svc := range services { if svcExternalIPs.Has(svc.Address.String()) && svc.Port == uint16(svcPort) && svc.Protocol == string(v1.ProtocolTCP) { found = true destinations, _ := ipvs.GetRealServers(svc) for _, dest := range destinations { if dest.Address.String() != epIP || dest.Port != uint16(svcPort) { t.Errorf("service Endpoint mismatch ipvs service destination") } } break } } if !found { t.Errorf("Expect external ip type service, got none") } } func TestOnlyLocalExternalIPs(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) svcIP := "10.20.30.41" svcPort := 80 svcExternalIPs := sets.New[string]("50.60.70.81", "2012::51", "127.0.0.1") svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP svc.Spec.ExternalIPs = svcExternalIPs.UnsortedList() svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(int32(svcPort)), }} svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal }), ) epIP := "10.180.0.1" epIP1 := "10.180.1.1" thisHostname := testHostname otherHostname := "other-hostname" populateEndpointSlices(fp, makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, NodeName: ptr.To(thisHostname), }, { Addresses: []string{epIP1}, NodeName: ptr.To(otherHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To(svcPortName.Port), Port: ptr.To(int32(svcPort)), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() // check ipvs service and destinations services, err := ipvs.GetVirtualServers() if err != nil { t.Errorf("Failed to get ipvs services, err: %v", err) } if len(services) != 3 { // ipvs filters out by IPFamily t.Errorf("Expect 3 ipvs services, got %d", len(services)) } found := false for _, svc := range services { if svcExternalIPs.Has(svc.Address.String()) && svc.Port == uint16(svcPort) && svc.Protocol == string(v1.ProtocolTCP) { found = true destinations, _ := ipvs.GetRealServers(svc) if len(destinations) != 1 { t.Errorf("Expect only 1 local endpoint. but got %v", len(destinations)) } for _, dest := range destinations { if dest.Address.String() != epIP || dest.Port != uint16(svcPort) { t.Errorf("service Endpoint mismatch ipvs service destination") } } break } } if !found { t.Errorf("Expect external ip type service, got none") } } func TestLoadBalancer(t *testing.T) { ipt, fp := buildFakeProxier() svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 svcLBIP := "1.2.3.4" svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, NodePort: int32(svcNodePort), }} svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: svcLBIP, }} }), ) epIP := "10.180.0.1" populateEndpointSlices(fp, makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To(svcPortName.Port), Port: ptr.To(int32(svcPort)), Protocol: ptr.To(v1.ProtocolUDP), }} }), ) fp.syncProxyRules() // Expect 2 services and 1 destination epVS := &netlinktest.ExpectedVirtualServer{ VSNum: 2, IP: svcLBIP, Port: uint16(svcNodePort), Protocol: string(v1.ProtocolTCP), RS: []netlinktest.ExpectedRealServer{{ IP: epIP, Port: uint16(svcPort), }}} checkIPVS(t, fp, epVS) // check ipSet rules epIPSet := netlinktest.ExpectedIPSet{ kubeLoadBalancerSet: {{ IP: svcLBIP, Port: svcPort, Protocol: strings.ToLower(string(v1.ProtocolTCP)), SetType: utilipset.HashIPPort, }}, } checkIPSet(t, fp, epIPSet) // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ JumpChain: "RETURN", SourceAddress: "127.0.0.0/8", }, { JumpChain: string(kubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet, }, { JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet, }, { JumpChain: string(kubeNodePortChain), MatchSet: "", }, { JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, }, { JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet, }}, string(kubeLoadBalancerSet): {{ JumpChain: string(kubeMarkMasqChain), MatchSet: "", }}, } checkIptables(t, ipt, epIpt) } func TestOnlyLocalNodePorts(t *testing.T) { nodeIP := netutils.ParseIPSloppy("100.101.102.103") ipt, fp := buildFakeProxier() svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, NodePort: int32(svcNodePort), }} svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal }), ) epIP := "10.180.0.1" epIP1 := "10.180.1.1" populateEndpointSlices(fp, makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, NodeName: ptr.To(testHostname), }, { Addresses: []string{epIP1}, NodeName: ptr.To("other-hostname"), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To(svcPortName.Port), Port: ptr.To(int32(svcPort)), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0} addrs := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("100.101.102.103"), Mask: net.CIDRMask(24, 32)}} itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0} addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}} fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs) fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1) fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"100.101.102.0/24"}, nil) fp.syncProxyRules() // Expect 2 services and 1 destination epVS := &netlinktest.ExpectedVirtualServer{ VSNum: 2, IP: nodeIP.String(), Port: uint16(svcNodePort), Protocol: string(v1.ProtocolTCP), RS: []netlinktest.ExpectedRealServer{{ IP: epIP, Port: uint16(svcPort), }}} checkIPVS(t, fp, epVS) // check ipSet rules epEntry := &utilipset.Entry{ Port: svcNodePort, Protocol: strings.ToLower(string(v1.ProtocolTCP)), SetType: utilipset.BitmapPort, } epIPSet := netlinktest.ExpectedIPSet{ kubeNodePortSetTCP: {epEntry}, kubeNodePortLocalSetTCP: {epEntry}, } checkIPSet(t, fp, epIPSet) // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ JumpChain: "RETURN", SourceAddress: "127.0.0.0/8", }, { JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet, }, { JumpChain: string(kubeNodePortChain), MatchSet: "", }, { JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, }}, string(kubeNodePortChain): {{ JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP, }, { JumpChain: string(kubeMarkMasqChain), MatchSet: kubeNodePortSetTCP, }, { JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, }}, } checkIptables(t, ipt, epIpt) } func TestHealthCheckNodePort(t *testing.T) { ipt, fp := buildFakeProxier() svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3000 svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } sampleSvc := makeTestService(svcPortName.Namespace, "", func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, NodePort: int32(svcNodePort), }} svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal }) svc1, svc2, invalidSvc3 := *sampleSvc, *sampleSvc, *sampleSvc svc1.Name, svc1.Spec.HealthCheckNodePort = "valid-svc1", 30000 svc2.Name, svc2.Spec.HealthCheckNodePort = "valid-svc2", 30001 // make svc3 invalid by setting external traffic policy to cluster invalidSvc3.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster invalidSvc3.Name, invalidSvc3.Spec.HealthCheckNodePort = "invalid-svc3", 30002 makeServiceMap(fp, &svc1, &svc2, &invalidSvc3, ) itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0} addrs := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("100.101.102.103"), Mask: net.CIDRMask(24, 32)}} itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0} addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}} fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs) fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1) fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"100.101.102.0/24"}, nil) fp.syncProxyRules() // check ipSet rules makeTCPEntry := func(port int) *utilipset.Entry { return &utilipset.Entry{ Port: port, Protocol: strings.ToLower(string(v1.ProtocolTCP)), SetType: utilipset.BitmapPort, } } epIPSet := netlinktest.ExpectedIPSet{ // healthcheck node port set should only contain valid HC node ports kubeHealthCheckNodePortSet: {makeTCPEntry(30000), makeTCPEntry(30001)}, } checkIPSet(t, fp, epIPSet) // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeNodePortChain): {{ JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP, }, { JumpChain: string(kubeMarkMasqChain), MatchSet: kubeNodePortSetTCP, }, { JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, }}, } checkIptables(t, ipt, epIpt) } func TestLoadBalancerSourceRanges(t *testing.T) { ipt, fp := buildFakeProxier() svcIP := "10.20.30.41" svcPort := 80 svcLBIP := "1.2.3.4" svcLBSource := "10.0.0.0/8" svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } epIP := "10.180.0.1" makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, }} svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: svcLBIP, }} svc.Spec.LoadBalancerSourceRanges = []string{ svcLBSource, } }), ) populateEndpointSlices(fp, makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To(svcPortName.Port), Port: ptr.To(int32(svcPort)), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() // Check ipvs service and destinations epVS := &netlinktest.ExpectedVirtualServer{ VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(v1.ProtocolTCP), RS: []netlinktest.ExpectedRealServer{{ IP: epIP, Port: uint16(svcPort), }}} checkIPVS(t, fp, epVS) // Check ipset entry epIPSet := netlinktest.ExpectedIPSet{ kubeLoadBalancerSet: {{ IP: svcLBIP, Port: svcPort, Protocol: strings.ToLower(string(v1.ProtocolTCP)), SetType: utilipset.HashIPPort, }}, kubeLoadBalancerFWSet: {{ IP: svcLBIP, Port: svcPort, Protocol: strings.ToLower(string(v1.ProtocolTCP)), SetType: utilipset.HashIPPort, }}, kubeLoadBalancerSourceCIDRSet: {{ IP: svcLBIP, Port: svcPort, Protocol: strings.ToLower(string(v1.ProtocolTCP)), Net: svcLBSource, SetType: utilipset.HashIPPortNet, }}, } checkIPSet(t, fp, epIPSet) // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ JumpChain: "RETURN", SourceAddress: "127.0.0.0/8", }, { JumpChain: string(kubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet, }, { JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet, }, { JumpChain: string(kubeNodePortChain), MatchSet: "", }, { JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, }, { JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet, }}, string(kubeProxyFirewallChain): {{ JumpChain: string(kubeSourceRangesFirewallChain), MatchSet: kubeLoadBalancerFWSet, }}, string(kubeSourceRangesFirewallChain): {{ JumpChain: "RETURN", MatchSet: kubeLoadBalancerSourceCIDRSet, }, { JumpChain: "DROP", MatchSet: "", }}, } checkIptables(t, ipt, epIpt) } func TestAcceptIPVSTraffic(t *testing.T) { ipt, fp := buildFakeProxier() ingressIP := "1.2.3.4" externalIP := []string{"5.6.7.8"} svcInfos := []struct { svcType v1.ServiceType svcIP string svcName string epIP string }{ {v1.ServiceTypeClusterIP, "10.20.30.40", "svc1", "10.180.0.1"}, {v1.ServiceTypeLoadBalancer, "10.20.30.41", "svc2", "10.180.0.2"}, {v1.ServiceTypeNodePort, "10.20.30.42", "svc3", "10.180.0.3"}, } for _, svcInfo := range svcInfos { makeServiceMap(fp, makeTestService("ns1", svcInfo.svcName, func(svc *v1.Service) { svc.Spec.Type = svcInfo.svcType svc.Spec.ClusterIP = svcInfo.svcIP svc.Spec.Ports = []v1.ServicePort{{ Name: "p80", Port: 80, Protocol: v1.ProtocolTCP, NodePort: 80, }} if svcInfo.svcType == v1.ServiceTypeLoadBalancer { svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: ingressIP, }} } if svcInfo.svcType == v1.ServiceTypeClusterIP { svc.Spec.ExternalIPs = externalIP } }), ) populateEndpointSlices(fp, makeTestEndpointSlice("ns1", "p80", 1, func(eps *discovery.EndpointSlice) { eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{svcInfo.epIP}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolUDP), }} }), ) } fp.syncProxyRules() // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): { {JumpChain: "RETURN", SourceAddress: "127.0.0.0/8"}, {JumpChain: string(kubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet}, {JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet}, {JumpChain: string(kubeMarkMasqChain), MatchSet: kubeExternalIPSet}, {JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With externalTrafficOnlyArgs {JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With dstLocalOnlyArgs {JumpChain: string(kubeNodePortChain), MatchSet: ""}, {JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet}, {JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet}, }, } checkIptables(t, ipt, epIpt) } func TestOnlyLocalLoadBalancing(t *testing.T) { ipt, fp := buildFakeProxier() svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 svcLBIP := "1.2.3.4" svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, NodePort: int32(svcNodePort), }} svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: svcLBIP, }} svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal }), ) epIP := "10.180.0.1" epIP1 := "10.180.1.1" populateEndpointSlices(fp, makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{ { // **local** endpoint address, should be added as RS Addresses: []string{epIP}, NodeName: ptr.To(testHostname), }, { // **remote** endpoint address, should not be added as RS Addresses: []string{epIP1}, NodeName: ptr.To("other-hostname"), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To(svcPortName.Port), Port: ptr.To(int32(svcPort)), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() // Expect 2 services and 1 destination epVS := &netlinktest.ExpectedVirtualServer{ VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(v1.ProtocolTCP), RS: []netlinktest.ExpectedRealServer{{ IP: epIP, Port: uint16(svcPort), }}} checkIPVS(t, fp, epVS) // check ipSet rules epIPSet := netlinktest.ExpectedIPSet{ kubeLoadBalancerSet: {{ IP: svcLBIP, Port: svcPort, Protocol: strings.ToLower(string(v1.ProtocolTCP)), SetType: utilipset.HashIPPort, }}, kubeLoadBalancerLocalSet: {{ IP: svcLBIP, Port: svcPort, Protocol: strings.ToLower(string(v1.ProtocolTCP)), SetType: utilipset.HashIPPort, }}, } checkIPSet(t, fp, epIPSet) // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ JumpChain: "RETURN", SourceAddress: "127.0.0.0/8", }, { JumpChain: string(kubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet, }, { JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet, }, { JumpChain: string(kubeNodePortChain), MatchSet: "", }, { JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, }, { JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet, }}, string(kubeLoadBalancerChain): {{ JumpChain: "RETURN", MatchSet: kubeLoadBalancerLocalSet, }, { JumpChain: string(kubeMarkMasqChain), MatchSet: "", }}, } checkIptables(t, ipt, epIpt) } func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port, nodeport int32, targetPort int) []v1.ServicePort { svcPort := v1.ServicePort{ Name: name, Protocol: protocol, Port: port, NodePort: nodeport, TargetPort: intstr.FromInt32(int32(targetPort)), } return append(array, svcPort) } func TestBuildServiceMapAddRemove(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) services := []*v1.Service{ makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somesctp", "SCTP", 1236, 6321, 0) }), makeTestService("somewhere-else", "node-port", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeNodePort svc.Spec.ClusterIP = "172.16.55.10" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpblah", "SCTP", 343, 676, 0) }), makeTestService("somewhere", "load-balancer", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ClusterIP = "172.16.55.11" svc.Spec.LoadBalancerIP = "5.6.7.8" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpfoo", "SCTP", 8677, 30063, 7002) svc.Status.LoadBalancer = v1.LoadBalancerStatus{ Ingress: []v1.LoadBalancerIngress{ {IP: "10.1.2.4"}, }, } }), makeTestService("somewhere", "only-local-load-balancer", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ClusterIP = "172.16.55.12" svc.Spec.LoadBalancerIP = "5.6.7.8" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpbaz", "SCTP", 8679, 30065, 7004) svc.Status.LoadBalancer = v1.LoadBalancerStatus{ Ingress: []v1.LoadBalancerIngress{ {IP: "10.1.2.3"}, }, } svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal svc.Spec.HealthCheckNodePort = 345 }), } for i := range services { fp.OnServiceAdd(services[i]) } result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 12 { t.Errorf("expected service map length 12, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } // The only-local-loadbalancer ones get added healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 1 { t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts) } else { nsn := makeNSN("somewhere", "only-local-load-balancer") if port, found := healthCheckNodePorts[nsn]; !found || port != 345 { t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts) } } // Remove some stuff // oneService is a modification of services[0] with removed first port. oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0) }) fp.OnServiceUpdate(services[0], oneService) fp.OnServiceDelete(services[1]) fp.OnServiceDelete(services[2]) fp.OnServiceDelete(services[3]) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 1 { t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } // All services but one were deleted. While you'd expect only the ClusterIPs // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"} if len(result.DeletedUDPClusterIPs) != len(expectedStaleUDPServices) { t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.DeletedUDPClusterIPs.UnsortedList()) } for _, ip := range expectedStaleUDPServices { if !result.DeletedUDPClusterIPs.Has(ip) { t.Errorf("expected stale UDP service service %s", ip) } } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts) } } func TestBuildServiceMapServiceHeadless(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) makeServiceMap(fp, makeTestService("somewhere-else", "headless", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = v1.ClusterIPNone svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0) }), makeTestService("somewhere-else", "headless-without-port", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = v1.ClusterIPNone }), makeTestService("somewhere-else", "headless-sctp", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = v1.ClusterIPNone svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sip", "SCTP", 1235, 0, 0) }), ) // Headless service should be ignored result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } // No proxied services, so no healthchecks healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts)) } } func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) makeServiceMap(fp, makeTestService("somewhere-else", "external-name", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeExternalName svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored svc.Spec.ExternalName = "foo2.bar.com" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0) }), ) result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs) } // No proxied services, so no healthchecks healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) } } func TestBuildServiceMapServiceUpdate(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0) }) servicev2 := makeTestService("somewhere", "some-service", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.LoadBalancerIP = "5.6.7.8" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003) svc.Status.LoadBalancer = v1.LoadBalancerStatus{ Ingress: []v1.LoadBalancerIngress{ {IP: "10.1.2.3"}, }, } svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal svc.Spec.HealthCheckNodePort = 345 }) fp.OnServiceAdd(servicev1) result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) } // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList()) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) } // No change; make sure the service map stays the same and there are // no health-check changes fp.OnServiceUpdate(servicev2, servicev2) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList()) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) } // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) } } func TestSessionAffinity(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) nodeIP := "100.101.102.103" fp := NewFakeProxier(ipt, ipvs, ipset, []string{nodeIP}, nil, v1.IPv4Protocol) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 svcExternalIPs := "50.60.70.81" svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP svc.Spec.ExternalIPs = []string{svcExternalIPs} svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{ ClientIP: &v1.ClientIPConfig{ TimeoutSeconds: ptr.To[int32](v1.DefaultClientIPServiceAffinitySeconds), }, } svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, NodePort: int32(svcNodePort), }} }), ) fp.syncProxyRules() // check ipvs service and destinations services, err := ipvs.GetVirtualServers() if err != nil { t.Errorf("Failed to get ipvs services, err: %v", err) } for _, svc := range services { if svc.Timeout != uint32(v1.DefaultClientIPServiceAffinitySeconds) { t.Errorf("Unexpected mismatch ipvs service session affinity timeout: %d, expected: %d", svc.Timeout, v1.DefaultClientIPServiceAffinitySeconds) } } } func makeServicePortName(ns, name, port string, protocol v1.Protocol) proxy.ServicePortName { return proxy.ServicePortName{ NamespacedName: makeNSN(ns, name), Port: port, Protocol: protocol, } } func Test_updateEndpointsMap(t *testing.T) { emptyEndpointSlices := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, func(*discovery.EndpointSlice) {}), } subset1 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }} } subset2 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.2"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }} } namedPortLocal := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.1"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }} }), } namedPort := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subset1), } namedPortRenamed := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11-2"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }} }), } namedPortRenumbered := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](22), Protocol: ptr.To(v1.ProtocolUDP), }} }), } namedPortsLocalNoLocal := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.1"}, }, { Addresses: []string{"1.1.1.2"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }} }), } multipleSubsets := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subset1), makeTestEndpointSlice("ns1", "ep1", 2, subset2), } subsetLocal := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.2"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }} } multipleSubsetsWithLocal := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subset1), makeTestEndpointSlice("ns1", "ep1", 2, subsetLocal), } subsetMultiplePortsLocal := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.1"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }} } subset3 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.3"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p13"), Port: ptr.To[int32](13), Protocol: ptr.To(v1.ProtocolUDP), }} } multipleSubsetsMultiplePortsLocal := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subsetMultiplePortsLocal), makeTestEndpointSlice("ns1", "ep1", 2, subset3), } subsetMultipleIPsPorts1 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.1"}, }, { Addresses: []string{"1.1.1.2"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }} } subsetMultipleIPsPorts2 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.3"}, }, { Addresses: []string{"1.1.1.4"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p13"), Port: ptr.To[int32](13), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p14"), Port: ptr.To[int32](14), Protocol: ptr.To(v1.ProtocolUDP), }} } subsetMultipleIPsPorts3 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"2.2.2.1"}, }, { Addresses: []string{"2.2.2.2"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p21"), Port: ptr.To[int32](21), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p22"), Port: ptr.To[int32](22), Protocol: ptr.To(v1.ProtocolUDP), }} } multipleSubsetsIPsPorts := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subsetMultipleIPsPorts1), makeTestEndpointSlice("ns1", "ep1", 2, subsetMultipleIPsPorts2), makeTestEndpointSlice("ns2", "ep2", 1, subsetMultipleIPsPorts3), } complexSubset1 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"2.2.2.2"}, NodeName: ptr.To(testHostname), }, { Addresses: []string{"2.2.2.22"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p22"), Port: ptr.To[int32](22), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset2 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"2.2.2.3"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p23"), Port: ptr.To[int32](23), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset3 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"4.4.4.4"}, NodeName: ptr.To(testHostname), }, { Addresses: []string{"4.4.4.5"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p44"), Port: ptr.To[int32](44), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset4 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"4.4.4.6"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p45"), Port: ptr.To[int32](45), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset5 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.1"}, }, { Addresses: []string{"1.1.1.11"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p11"), Port: ptr.To[int32](11), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset6 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.2"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p12"), Port: ptr.To[int32](12), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("p122"), Port: ptr.To[int32](122), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset7 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"3.3.3.3"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p33"), Port: ptr.To[int32](33), Protocol: ptr.To(v1.ProtocolUDP), }} } complexSubset8 := func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"4.4.4.4"}, NodeName: ptr.To(testHostname), }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p44"), Port: ptr.To[int32](44), Protocol: ptr.To(v1.ProtocolUDP), }} } complexBefore := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, subset1), nil, makeTestEndpointSlice("ns2", "ep2", 1, complexSubset1), makeTestEndpointSlice("ns2", "ep2", 2, complexSubset2), nil, makeTestEndpointSlice("ns4", "ep4", 1, complexSubset3), makeTestEndpointSlice("ns4", "ep4", 2, complexSubset4), } complexAfter := []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, complexSubset5), makeTestEndpointSlice("ns1", "ep1", 2, complexSubset6), nil, nil, makeTestEndpointSlice("ns3", "ep3", 1, complexSubset7), makeTestEndpointSlice("ns4", "ep4", 1, complexSubset8), nil, } testCases := []struct { // previousEndpoints and currentEndpoints are used to call appropriate // handlers OnEndpoints* (based on whether corresponding values are nil // or non-nil) and must be of equal length. name string previousEndpoints []*discovery.EndpointSlice currentEndpoints []*discovery.EndpointSlice oldEndpoints map[proxy.ServicePortName][]endpointExpectation expectedResult map[proxy.ServicePortName][]endpointExpectation expectedDeletedUDPEndpoints []proxy.ServiceEndpoint expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool expectedReadyEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing name: "nothing", oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[1]: no change, named port, local name: "no change, named port, local", previousEndpoints: namedPortLocal, currentEndpoints: namedPortLocal, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { // Case[2]: no change, multiple subsets name: "no change, multiple subsets", previousEndpoints: multipleSubsets, currentEndpoints: multipleSubsets, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "1.1.1.2:12", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "1.1.1.2:12", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[3]: no change, multiple subsets, multiple ports, local name: "no change, multiple subsets, multiple ports, local", previousEndpoints: multipleSubsetsMultiplePortsLocal, currentEndpoints: multipleSubsetsMultiplePortsLocal, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "1.1.1.1:12", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { {endpoint: "1.1.1.3:13", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "1.1.1.1:12", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { {endpoint: "1.1.1.3:13", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { // Case[4]: no change, multiple endpoints, subsets, IPs, and ports name: "no change, multiple endpoints, subsets, IPs, and ports", previousEndpoints: multipleSubsetsIPsPorts, currentEndpoints: multipleSubsetsIPsPorts, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, {endpoint: "1.1.1.2:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "1.1.1.1:12", isLocal: false}, {endpoint: "1.1.1.2:12", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { {endpoint: "1.1.1.3:13", isLocal: false}, {endpoint: "1.1.1.4:13", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): { {endpoint: "1.1.1.3:14", isLocal: false}, {endpoint: "1.1.1.4:14", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): { {endpoint: "2.2.2.1:21", isLocal: false}, {endpoint: "2.2.2.2:21", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): { {endpoint: "2.2.2.1:22", isLocal: false}, {endpoint: "2.2.2.2:22", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, {endpoint: "1.1.1.2:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "1.1.1.1:12", isLocal: false}, {endpoint: "1.1.1.2:12", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { {endpoint: "1.1.1.3:13", isLocal: false}, {endpoint: "1.1.1.4:13", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): { {endpoint: "1.1.1.3:14", isLocal: false}, {endpoint: "1.1.1.4:14", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): { {endpoint: "2.2.2.1:21", isLocal: false}, {endpoint: "2.2.2.2:21", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): { {endpoint: "2.2.2.1:22", isLocal: false}, {endpoint: "2.2.2.2:22", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, }, }, { // Case[5]: add an Endpoints name: "add an Endpoints", previousEndpoints: []*discovery.EndpointSlice{nil}, currentEndpoints: namedPortLocal, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { // Case[6]: remove an Endpoints name: "remove an Endpoints", previousEndpoints: namedPortLocal, currentEndpoints: []*discovery.EndpointSlice{nil}, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "1.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[7]: add an IP and port name: "add an IP and port", previousEndpoints: namedPort, currentEndpoints: namedPortsLocalNoLocal, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, {endpoint: "1.1.1.2:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "1.1.1.1:12", isLocal: false}, {endpoint: "1.1.1.2:12", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { // Case[8]: remove an IP and port name: "remove an IP and port", previousEndpoints: namedPortsLocalNoLocal, currentEndpoints: namedPort, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, {endpoint: "1.1.1.2:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "1.1.1.1:12", isLocal: false}, {endpoint: "1.1.1.2:12", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "1.1.1.2:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }, { Endpoint: "1.1.1.1:12", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }, { Endpoint: "1.1.1.2:12", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[9]: add a subset name: "add a subset", previousEndpoints: []*discovery.EndpointSlice{namedPort[0], nil}, currentEndpoints: multipleSubsetsWithLocal, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "1.1.1.2:12", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { // Case[10]: remove a subset name: "remove a subset", previousEndpoints: multipleSubsets, currentEndpoints: []*discovery.EndpointSlice{namedPort[0], nil}, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "1.1.1.2:12", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "1.1.1.2:12", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[11]: rename a port name: "rename a port", previousEndpoints: namedPort, currentEndpoints: namedPortRenamed, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "1.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[12]: renumber a port name: "renumber a port", previousEndpoints: namedPort, currentEndpoints: namedPortRenumbered, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:22", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "1.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[13]: complex add and remove name: "complex add and remove", previousEndpoints: complexBefore, currentEndpoints: complexAfter, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): { {endpoint: "2.2.2.22:22", isLocal: true}, {endpoint: "2.2.2.2:22", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP): { {endpoint: "2.2.2.3:23", isLocal: true}, }, makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): { {endpoint: "4.4.4.4:44", isLocal: true}, {endpoint: "4.4.4.5:44", isLocal: true}, }, makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP): { {endpoint: "4.4.4.6:45", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.11:11", isLocal: false}, {endpoint: "1.1.1.1:11", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {endpoint: "1.1.1.2:12", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): { {endpoint: "1.1.1.2:122", isLocal: false}, }, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): { {endpoint: "3.3.3.3:33", isLocal: false}, }, makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): { {endpoint: "4.4.4.4:44", isLocal: true}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "2.2.2.2:22", ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), }, { Endpoint: "2.2.2.22:22", ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), }, { Endpoint: "2.2.2.3:23", ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP), }, { Endpoint: "4.4.4.5:44", ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP), }, { Endpoint: "4.4.4.6:45", ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), }}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, }, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, }, { // Case[14]: change from 0 endpoint address to 1 named port name: "change from 0 endpoint address to 1 named port", previousEndpoints: emptyEndpointSlices, currentEndpoints: namedPort, oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, expectedResult: map[proxy.ServicePortName][]endpointExpectation{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, expectedReadyEndpoints: map[types.NamespacedName]int{}, }, } for tci, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) fp.hostname = testHostname // First check that after adding all previous versions of endpoints, // the fp.oldEndpoints is as we expect. for i := range tc.previousEndpoints { if tc.previousEndpoints[i] != nil { fp.OnEndpointSliceAdd(tc.previousEndpoints[i]) } } fp.endpointsMap.Update(fp.endpointsChanges) checkEndpointExpectations(t, tci, fp.endpointsMap, tc.oldEndpoints) // Now let's call appropriate handlers to get to state we want to be. if len(tc.previousEndpoints) != len(tc.currentEndpoints) { t.Fatalf("[%d] different lengths of previous and current endpoints", tci) } for i := range tc.previousEndpoints { prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i] switch { case prev == nil: fp.OnEndpointSliceAdd(curr) case curr == nil: fp.OnEndpointSliceDelete(prev) default: fp.OnEndpointSliceUpdate(prev, curr) } } result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap checkEndpointExpectations(t, tci, newMap, tc.expectedResult) if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) { t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints) } for _, x := range tc.expectedDeletedUDPEndpoints { found := false for _, stale := range result.DeletedUDPEndpoints { if stale == x { found = true break } } if !found { t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints) } } if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) { t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices) } for svcName := range tc.expectedNewlyActiveUDPServices { found := false for _, stale := range result.NewlyActiveUDPServices { if stale == svcName { found = true break } } if !found { t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices) } } localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() if !reflect.DeepEqual(localReadyEndpoints, tc.expectedReadyEndpoints) { t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedReadyEndpoints, localReadyEndpoints) } }) } } type endpointExpectation struct { endpoint string isLocal bool } func checkEndpointExpectations(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]endpointExpectation) { if len(newMap) != len(expected) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap) } for x := range expected { if len(newMap[x]) != len(expected[x]) { t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x])) } else { for i := range expected[x] { newEp := newMap[x][i] if newEp.String() != expected[x][i].endpoint || newEp.IsLocal() != expected[x][i].isLocal { t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp) } } } } } func Test_syncService(t *testing.T) { testCases := []struct { oldVirtualServer *utilipvs.VirtualServer svcName string newVirtualServer *utilipvs.VirtualServer bindAddr bool alreadyBoundAddrs sets.Set[string] }{ { // case 0, old virtual server is same as new virtual server oldVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolTCP), Port: 80, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, svcName: "foo", newVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolTCP), Port: 80, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, bindAddr: false, alreadyBoundAddrs: nil, }, { // case 1, old virtual server is different from new virtual server oldVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolTCP), Port: 8080, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, svcName: "bar", newVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolTCP), Port: 8080, Scheduler: "rr", Flags: utilipvs.FlagPersistent, }, bindAddr: false, alreadyBoundAddrs: nil, }, { // case 2, old virtual server is different from new virtual server oldVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolTCP), Port: 8080, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, svcName: "bar", newVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolTCP), Port: 8080, Scheduler: "wlc", Flags: utilipvs.FlagHashed, }, bindAddr: false, alreadyBoundAddrs: nil, }, { // case 3, old virtual server is nil, and create new virtual server oldVirtualServer: nil, svcName: "baz", newVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolUDP), Port: 53, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, bindAddr: true, alreadyBoundAddrs: nil, }, { // case 4, SCTP, old virtual server is same as new virtual server oldVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolSCTP), Port: 80, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, svcName: "foo", newVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolSCTP), Port: 80, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, bindAddr: false, alreadyBoundAddrs: nil, }, { // case 5, old virtual server is different from new virtual server oldVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolSCTP), Port: 8080, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, svcName: "bar", newVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolSCTP), Port: 8080, Scheduler: "rr", Flags: utilipvs.FlagPersistent, }, bindAddr: false, alreadyBoundAddrs: nil, }, { // case 6, old virtual server is different from new virtual server oldVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolSCTP), Port: 8080, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, svcName: "bar", newVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolSCTP), Port: 8080, Scheduler: "wlc", Flags: utilipvs.FlagHashed, }, bindAddr: false, alreadyBoundAddrs: nil, }, { // case 7, old virtual server is nil, and create new virtual server oldVirtualServer: nil, svcName: "baz", newVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolSCTP), Port: 53, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, bindAddr: true, alreadyBoundAddrs: sets.New[string](), }, { // case 8, virtual server address already binded, skip sync oldVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolSCTP), Port: 53, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, svcName: "baz", newVirtualServer: &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("1.2.3.4"), Protocol: string(v1.ProtocolSCTP), Port: 53, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, bindAddr: true, alreadyBoundAddrs: sets.New("1.2.3.4"), }, } for i := range testCases { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) proxier := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) proxier.netlinkHandle.EnsureDummyDevice(defaultDummyDevice) if testCases[i].oldVirtualServer != nil { if err := proxier.ipvs.AddVirtualServer(testCases[i].oldVirtualServer); err != nil { t.Errorf("Case [%d], unexpected add IPVS virtual server error: %v", i, err) } } if err := proxier.syncService(testCases[i].svcName, testCases[i].newVirtualServer, testCases[i].bindAddr, testCases[i].alreadyBoundAddrs); err != nil { t.Errorf("Case [%d], unexpected sync IPVS virtual server error: %v", i, err) } // check list, err := proxier.ipvs.GetVirtualServers() if err != nil { t.Errorf("Case [%d], unexpected list IPVS virtual server error: %v", i, err) } if len(list) != 1 { t.Errorf("Case [%d], expect %d virtual servers, got %d", i, 1, len(list)) continue } if !list[0].Equal(testCases[i].newVirtualServer) { t.Errorf("Case [%d], unexpected mismatch, expect: %#v, got: %#v", i, testCases[i].newVirtualServer, list[0]) } } } func buildFakeProxier() (*iptablestest.FakeIPTables, *Proxier) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) return ipt, NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) } func getRules(ipt *iptablestest.FakeIPTables, chain utiliptables.Chain) []*iptablestest.Rule { var rules []*iptablestest.Rule buf := bytes.NewBuffer(nil) _ = ipt.SaveInto(utiliptables.TableNAT, buf) _ = ipt.SaveInto(utiliptables.TableFilter, buf) lines := strings.Split(buf.String(), "\n") for _, l := range lines { if !strings.HasPrefix(l, "-A ") { continue } rule, _ := iptablestest.ParseRule(l, false) if rule != nil && rule.Chain == chain { rules = append(rules, rule) } } return rules } // checkIptables to check expected iptables chain and rules. The got rules must have same number and order as the // expected rules. func checkIptables(t *testing.T, ipt *iptablestest.FakeIPTables, epIpt netlinktest.ExpectedIptablesChain) { for epChain, epRules := range epIpt { rules := getRules(ipt, utiliptables.Chain(epChain)) if len(rules) != len(epRules) { t.Errorf("Expected %d iptables rule in chain %s, got %d", len(epRules), epChain, len(rules)) continue } for i, epRule := range epRules { rule := rules[i] if rule.Jump == nil || rule.Jump.Value != epRule.JumpChain { t.Errorf("Expected MatchSet=%s JumpChain=%s, got %s", epRule.MatchSet, epRule.JumpChain, rule.Raw) } if (epRule.MatchSet == "" && rule.MatchSet != nil) || (epRule.MatchSet != "" && (rule.MatchSet == nil || rule.MatchSet.Value != epRule.MatchSet)) { t.Errorf("Expected MatchSet=%s JumpChain=%s, got %s", epRule.MatchSet, epRule.JumpChain, rule.Raw) } } } } // checkIPSet to check expected ipset and entries func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) { for set, entries := range ipSet { ents, err := fp.ipset.ListEntries(set) if err != nil || len(ents) != len(entries) { t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents)) continue } expectedEntries := []string{} for _, entry := range entries { expectedEntries = append(expectedEntries, entry.String()) } sort.Strings(ents) sort.Strings(expectedEntries) if !reflect.DeepEqual(ents, expectedEntries) { t.Errorf("Check ipset entries failed for ipset: %q", set) } } } // checkIPVS to check expected ipvs service and destination func checkIPVS(t *testing.T, fp *Proxier, vs *netlinktest.ExpectedVirtualServer) { t.Helper() services, err := fp.ipvs.GetVirtualServers() if err != nil { t.Errorf("Failed to get ipvs services, err: %v", err) } if len(services) != vs.VSNum { t.Errorf("Expect %d ipvs services, got %d", vs.VSNum, len(services)) } for _, svc := range services { if svc.Address.String() == vs.IP && svc.Port == vs.Port && svc.Protocol == vs.Protocol { destinations, _ := fp.ipvs.GetRealServers(svc) if len(destinations) != len(vs.RS) { t.Errorf("Expected %d destinations, got %d destinations", len(vs.RS), len(destinations)) } if len(vs.RS) == 1 { if destinations[0].Address.String() != vs.RS[0].IP || destinations[0].Port != vs.RS[0].Port { t.Errorf("Unexpected mismatch destinations") } } } } } func TestCleanLegacyService(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) excludeCIDRs, _ := netutils.ParseCIDRs([]string{"3.3.3.0/24", "4.4.4.0/24"}) fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol) // All ipvs services that were processed in the latest sync loop. activeServices := sets.New("ipvs0", "ipvs1") // All ipvs services in the system. currentServices := map[string]*utilipvs.VirtualServer{ // Created by kube-proxy. "ipvs0": { Address: netutils.ParseIPSloppy("1.1.1.1"), Protocol: string(v1.ProtocolUDP), Port: 53, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, // Created by kube-proxy. "ipvs1": { Address: netutils.ParseIPSloppy("2.2.2.2"), Protocol: string(v1.ProtocolUDP), Port: 54, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, // Created by an external party. "ipvs2": { Address: netutils.ParseIPSloppy("3.3.3.3"), Protocol: string(v1.ProtocolUDP), Port: 55, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, // Created by an external party. "ipvs3": { Address: netutils.ParseIPSloppy("4.4.4.4"), Protocol: string(v1.ProtocolUDP), Port: 56, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, // Created by an external party. "ipvs4": { Address: netutils.ParseIPSloppy("5.5.5.5"), Protocol: string(v1.ProtocolUDP), Port: 57, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, // Created by kube-proxy, but now stale. "ipvs5": { Address: netutils.ParseIPSloppy("6.6.6.6"), Protocol: string(v1.ProtocolUDP), Port: 58, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, } for v := range currentServices { fp.ipvs.AddVirtualServer(currentServices[v]) } fp.cleanLegacyService(activeServices, currentServices) // ipvs4 and ipvs5 should have been cleaned. remainingVirtualServers, _ := fp.ipvs.GetVirtualServers() if len(remainingVirtualServers) != 4 { t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 4, len(remainingVirtualServers)) } for _, vs := range remainingVirtualServers { // Checking that ipvs4 and ipvs5 were removed. if vs.Port == 57 { t.Errorf("Expected ipvs4 to be removed after cleanup. It still remains") } if vs.Port == 58 { t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains") } } } func TestCleanLegacyServiceWithRealServers(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) // all deleted expect ipvs2 activeServices := sets.New("ipvs2") // All ipvs services in the system. currentServices := map[string]*utilipvs.VirtualServer{ "ipvs0": { // deleted with real servers Address: netutils.ParseIPSloppy("1.1.1.1"), Protocol: string(v1.ProtocolUDP), Port: 53, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, "ipvs1": { // deleted no real server Address: netutils.ParseIPSloppy("2.2.2.2"), Protocol: string(v1.ProtocolUDP), Port: 54, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, "ipvs2": { // not deleted Address: netutils.ParseIPSloppy("3.3.3.3"), Protocol: string(v1.ProtocolUDP), Port: 54, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, } // "ipvs0" has a real server, but it should still be deleted since the Service is deleted realServers := map[*utilipvs.VirtualServer]*utilipvs.RealServer{ { Address: netutils.ParseIPSloppy("1.1.1.1"), Protocol: string(v1.ProtocolUDP), Port: 53, Scheduler: "rr", Flags: utilipvs.FlagHashed, }: { Address: netutils.ParseIPSloppy("10.180.0.1"), Port: uint16(53), Weight: 1, }, } for v := range currentServices { fp.ipvs.AddVirtualServer(currentServices[v]) } for v, r := range realServers { fp.ipvs.AddRealServer(v, r) } fp.cleanLegacyService(activeServices, currentServices) remainingVirtualServers, _ := fp.ipvs.GetVirtualServers() if len(remainingVirtualServers) != 1 { t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 1, len(remainingVirtualServers)) } if remainingVirtualServers[0] != currentServices["ipvs2"] { t.Logf("actual virtual server: %v", remainingVirtualServers[0]) t.Logf("expected virtual server: %v", currentServices["ipvs0"]) t.Errorf("unexpected IPVS service") } } func TestCleanLegacyRealServersExcludeCIDRs(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) gtm := NewGracefulTerminationManager(ipvs) excludeCIDRs, _ := netutils.ParseCIDRs([]string{"4.4.4.4/32"}) fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol) fp.gracefuldeleteManager = gtm vs := &utilipvs.VirtualServer{ Address: netutils.ParseIPSloppy("4.4.4.4"), Protocol: string(v1.ProtocolUDP), Port: 56, Scheduler: "rr", Flags: utilipvs.FlagHashed, } fp.ipvs.AddVirtualServer(vs) rss := []*utilipvs.RealServer{ { Address: netutils.ParseIPSloppy("10.10.10.10"), Port: 56, ActiveConn: 0, InactiveConn: 0, }, { Address: netutils.ParseIPSloppy("11.11.11.11"), Port: 56, ActiveConn: 0, InactiveConn: 0, }, } for _, rs := range rss { fp.ipvs.AddRealServer(vs, rs) } fp.netlinkHandle.EnsureDummyDevice(defaultDummyDevice) fp.netlinkHandle.EnsureAddressBind("4.4.4.4", defaultDummyDevice) fp.cleanLegacyService(nil, map[string]*utilipvs.VirtualServer{"ipvs0": vs}) fp.gracefuldeleteManager.tryDeleteRs() remainingRealServers, _ := fp.ipvs.GetRealServers(vs) if len(remainingRealServers) != 2 { t.Errorf("Expected number of remaining IPVS real servers after cleanup should be %v. Got %v", 2, len(remainingRealServers)) } } func TestCleanLegacyService6(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) excludeCIDRs, _ := netutils.ParseCIDRs([]string{"3000::/64", "4000::/64"}) fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv6Protocol) fp.nodeIP = netutils.ParseIPSloppy("::1") // All ipvs services that were processed in the latest sync loop. activeServices := sets.New("ipvs0", "ipvs1") // All ipvs services in the system. currentServices := map[string]*utilipvs.VirtualServer{ // Created by kube-proxy. "ipvs0": { Address: netutils.ParseIPSloppy("1000::1"), Protocol: string(v1.ProtocolUDP), Port: 53, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, // Created by kube-proxy. "ipvs1": { Address: netutils.ParseIPSloppy("1000::2"), Protocol: string(v1.ProtocolUDP), Port: 54, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, // Created by an external party. "ipvs2": { Address: netutils.ParseIPSloppy("3000::1"), Protocol: string(v1.ProtocolUDP), Port: 55, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, // Created by an external party. "ipvs3": { Address: netutils.ParseIPSloppy("4000::1"), Protocol: string(v1.ProtocolUDP), Port: 56, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, // Created by an external party. "ipvs4": { Address: netutils.ParseIPSloppy("5000::1"), Protocol: string(v1.ProtocolUDP), Port: 57, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, // Created by kube-proxy, but now stale. "ipvs5": { Address: netutils.ParseIPSloppy("1000::6"), Protocol: string(v1.ProtocolUDP), Port: 58, Scheduler: "rr", Flags: utilipvs.FlagHashed, }, } for v := range currentServices { fp.ipvs.AddVirtualServer(currentServices[v]) } fp.cleanLegacyService(activeServices, currentServices) // ipvs4 and ipvs5 should have been cleaned. remainingVirtualServers, _ := fp.ipvs.GetVirtualServers() if len(remainingVirtualServers) != 4 { t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 4, len(remainingVirtualServers)) } for _, vs := range remainingVirtualServers { // Checking that ipvs4 and ipvs5 were removed. if vs.Port == 57 { t.Errorf("Expected ipvs4 to be removed after cleanup. It still remains") } if vs.Port == 58 { t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains") } } } func TestMultiPortServiceBindAddr(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) service1 := makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "TCP", 1234, 0, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 1235, 0, 0) }) service2 := makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "TCP", 1234, 0, 0) }) service3 := makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "TCP", 1234, 0, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 1235, 0, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port3", "UDP", 1236, 0, 0) }) fp.servicesSynced = true // first, add multi-port service1 fp.OnServiceAdd(service1) fp.syncProxyRules() remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(defaultDummyDevice) // should only remain address "172.16.55.4" if len(remainingAddrs) != 1 { t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs)) } if remainingAddrs[0] != "172.16.55.4" { t.Errorf("Expected remaining address should be %s, got %s", "172.16.55.4", remainingAddrs[0]) } // update multi-port service1 to single-port service2 fp.OnServiceUpdate(service1, service2) fp.syncProxyRules() remainingAddrs, _ = fp.netlinkHandle.ListBindAddress(defaultDummyDevice) // should still only remain address "172.16.55.4" if len(remainingAddrs) != 1 { t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs)) } else if remainingAddrs[0] != "172.16.55.4" { t.Errorf("Expected remaining address should be %s, got %s", "172.16.55.4", remainingAddrs[0]) } // update single-port service2 to multi-port service3 fp.OnServiceUpdate(service2, service3) fp.syncProxyRules() remainingAddrs, _ = fp.netlinkHandle.ListBindAddress(defaultDummyDevice) // should still only remain address "172.16.55.4" if len(remainingAddrs) != 1 { t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs)) } else if remainingAddrs[0] != "172.16.55.4" { t.Errorf("Expected remaining address should be %s, got %s", "172.16.55.4", remainingAddrs[0]) } // delete multi-port service3 fp.OnServiceDelete(service3) fp.syncProxyRules() remainingAddrs, _ = fp.netlinkHandle.ListBindAddress(defaultDummyDevice) // all addresses should be unbound if len(remainingAddrs) != 0 { t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 0, len(remainingAddrs)) } } func Test_getFirstColumn(t *testing.T) { testCases := []struct { name string fileContent string want []string wantErr bool }{ { name: "valid content", fileContent: `libiscsi_tcp 28672 1 iscsi_tcp, Live 0xffffffffc07ae000 libiscsi 57344 3 ib_iser,iscsi_tcp,libiscsi_tcp, Live 0xffffffffc079a000 raid10 57344 0 - Live 0xffffffffc0597000`, want: []string{"libiscsi_tcp", "libiscsi", "raid10"}, wantErr: false, }, } for _, test := range testCases { t.Run(test.name, func(t *testing.T) { got, err := getFirstColumn(strings.NewReader(test.fileContent)) if (err != nil) != test.wantErr { t.Errorf("getFirstColumn() error = %v, wantErr %v", err, test.wantErr) return } if !reflect.DeepEqual(got, test.want) { t.Errorf("getFirstColumn() = %v, want %v", got, test.want) } }) } } // The majority of EndpointSlice specific tests are not ipvs specific and focus on // the shared EndpointsChangeTracker and EndpointSliceCache. This test ensures that the // ipvs proxier supports translating EndpointSlices to ipvs output. func TestEndpointSliceE2E(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) fp.servicesSynced = true fp.endpointSlicesSynced = true // Add initial service serviceName := "svc1" namespaceName := "ns1" fp.OnServiceAdd(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "172.20.1.1", Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP}}, }, }) // Add initial endpoint slice endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{{ Addresses: []string{"10.0.1.1"}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.2"}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To("node2"), }, { Addresses: []string{"10.0.1.3"}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To("node3"), }, { // not ready endpoints should be ignored Addresses: []string{"10.0.1.4"}, Conditions: discovery.EndpointConditions{Ready: ptr.To(false)}, NodeName: ptr.To("node3"), }}, } fp.OnEndpointSliceAdd(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice update assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-LOOP-BACK") assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod") virtualServers1, vsErr1 := ipvs.GetVirtualServers() assert.Nil(t, vsErr1, "Expected no error getting virtual servers") assert.Len(t, virtualServers1, 1, "Expected 1 virtual server") realServers1, rsErr1 := ipvs.GetRealServers(virtualServers1[0]) assert.Nil(t, rsErr1, "Expected no error getting real servers") assert.Len(t, realServers1, 3, "Expected 3 real servers") assert.Equal(t, realServers1[0].String(), "10.0.1.1:80") assert.Equal(t, realServers1[1].String(), "10.0.1.2:80") assert.Equal(t, realServers1[2].String(), "10.0.1.3:80") fp.OnEndpointSliceDelete(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") virtualServers2, vsErr2 := ipvs.GetVirtualServers() assert.Nil(t, vsErr2, "Expected no error getting virtual servers") assert.Len(t, virtualServers2, 1, "Expected 1 virtual server") realServers2, rsErr2 := ipvs.GetRealServers(virtualServers2[0]) assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 0, "Expected 0 real servers") } func TestHealthCheckNodePortE2E(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) fp.servicesSynced = true fp.endpointSlicesSynced = true // Add initial service serviceName := "svc1" namespaceName := "ns1" svc := v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "172.20.1.1", Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP}}, Type: "LoadBalancer", HealthCheckNodePort: 30000, ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, }, } fp.OnServiceAdd(&svc) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after service's being created assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"]) activeEntries1 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-HEALTH-CHECK-NODE-PORT") assert.Equal(t, true, activeEntries1.Has("30000"), "Expected activeEntries to reference hc node port in spec") // Update health check node port in the spec newSvc := svc newSvc.Spec.HealthCheckNodePort = 30001 fp.OnServiceUpdate(&svc, &newSvc) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after service's being updated assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"]) activeEntries2 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries assert.Equal(t, 1, activeEntries2.Len(), "Expected 1 active entry in KUBE-HEALTH-CHECK-NODE-PORT") assert.Equal(t, true, activeEntries2.Has("30001"), "Expected activeEntries to reference updated hc node port in spec") fp.OnServiceDelete(&svc) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"]) activeEntries3 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries assert.Equal(t, 0, activeEntries3.Len(), "Expected 0 active entries in KUBE-HEALTH-CHECK-NODE-PORT") } // Test_HealthCheckNodePortWhenTerminating tests that health check node ports are not enabled when all local endpoints are terminating func Test_HealthCheckNodePortWhenTerminating(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) fp.servicesSynced = true // fp.endpointsSynced = true fp.endpointSlicesSynced = true serviceName := "svc1" namespaceName := "ns1" fp.OnServiceAdd(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "172.20.1.1", Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP}}, }, }) endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{{ Addresses: []string{"10.0.1.1"}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.2"}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.3"}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, }, { // not ready endpoints should be ignored Addresses: []string{"10.0.1.4"}, Conditions: discovery.EndpointConditions{Ready: ptr.To(false)}, NodeName: ptr.To(testHostname), }}, } fp.OnEndpointSliceAdd(endpointSlice) _ = fp.endpointsMap.Update(fp.endpointsChanges) localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() if len(localReadyEndpoints) != 1 { t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(localReadyEndpoints)) } // set all endpoints to terminating endpointSliceTerminating := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{{ Addresses: []string{"10.0.1.1"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.2"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.3"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { // not ready endpoints should be ignored Addresses: []string{"10.0.1.4"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }}, } fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating) _ = fp.endpointsMap.Update(fp.endpointsChanges) localReadyEndpoints = fp.endpointsMap.LocalReadyEndpoints() if len(localReadyEndpoints) != 0 { t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(localReadyEndpoints)) } } func TestFilterCIDRs(t *testing.T) { var cidrList []string var cidrs []string var expected []string cidrs = filterCIDRs(true, []string{}) if len(cidrs) > 0 { t.Errorf("An empty list produces a non-empty return %v", cidrs) } cidrList = []string{"1000::/64", "10.0.0.0/16", "11.0.0.0/16", "2000::/64"} expected = []string{"1000::/64", "2000::/64"} cidrs = filterCIDRs(true, cidrList) if !reflect.DeepEqual(cidrs, expected) { t.Errorf("cidrs %v is not expected %v", cidrs, expected) } expected = []string{"10.0.0.0/16", "11.0.0.0/16"} cidrs = filterCIDRs(false, cidrList) if !reflect.DeepEqual(cidrs, expected) { t.Errorf("cidrs %v is not expected %v", cidrs, expected) } cidrList = []string{"1000::/64", "2000::/64"} expected = []string{} cidrs = filterCIDRs(false, cidrList) if len(cidrs) > 0 { t.Errorf("cidrs %v is not expected %v", cidrs, expected) } } func TestCreateAndLinkKubeChain(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) fp.createAndLinkKubeChain() expectedNATChains := `:KUBE-SERVICES - [0:0] :KUBE-POSTROUTING - [0:0] :KUBE-NODE-PORT - [0:0] :KUBE-LOAD-BALANCER - [0:0] :KUBE-MARK-MASQ - [0:0] ` expectedFilterChains := `:KUBE-FORWARD - [0:0] :KUBE-NODE-PORT - [0:0] :KUBE-PROXY-FIREWALL - [0:0] :KUBE-SOURCE-RANGES-FIREWALL - [0:0] :KUBE-IPVS-FILTER - [0:0] :KUBE-IPVS-OUT-FILTER - [0:0] ` assert.Equal(t, expectedNATChains, fp.natChains.String()) assert.Equal(t, expectedFilterChains, fp.filterChains.String()) } // This test ensures that the iptables proxier supports translating Endpoints to // iptables output when internalTrafficPolicy is specified func TestTestInternalTrafficPolicyE2E(t *testing.T) { type endpoint struct { ip string hostname string } testCases := []struct { name string internalTrafficPolicy *v1.ServiceInternalTrafficPolicy endpoints []endpoint expectVirtualServer bool expectLocalEntries bool expectLocalRealServerNum int expectLocalRealServers []string }{ { name: "internalTrafficPolicy is cluster with non-zero local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster), endpoints: []endpoint{ {"10.0.1.1", testHostname}, {"10.0.1.2", "host1"}, {"10.0.1.3", "host2"}, }, expectVirtualServer: true, expectLocalEntries: true, expectLocalRealServerNum: 3, expectLocalRealServers: []string{ "10.0.1.1:80", "10.0.1.2:80", "10.0.1.3:80", }, }, { name: "internalTrafficPolicy is cluster with zero local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster), endpoints: []endpoint{ {"10.0.1.1", "host0"}, {"10.0.1.2", "host1"}, {"10.0.1.3", "host2"}, }, expectVirtualServer: false, expectLocalEntries: false, expectLocalRealServerNum: 3, expectLocalRealServers: []string{ "10.0.1.1:80", "10.0.1.2:80", "10.0.1.3:80", }, }, { name: "internalTrafficPolicy is local with non-zero local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), endpoints: []endpoint{ {"10.0.1.1", testHostname}, {"10.0.1.2", "host1"}, {"10.0.1.3", "host2"}, }, expectVirtualServer: true, expectLocalEntries: true, expectLocalRealServerNum: 1, expectLocalRealServers: []string{ "10.0.1.1:80", }, }, { name: "internalTrafficPolicy is local with zero local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), endpoints: []endpoint{ {"10.0.1.1", "host0"}, {"10.0.1.2", "host1"}, {"10.0.1.3", "host2"}, }, expectVirtualServer: false, expectLocalEntries: false, expectLocalRealServerNum: 0, expectLocalRealServers: []string{}, }, } for _, tc := range testCases { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) fp.servicesSynced = true // fp.endpointsSynced = true fp.endpointSlicesSynced = true // Add initial service serviceName := "svc1" namespaceName := "ns1" svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "172.20.1.1", Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP}}, }, } if tc.internalTrafficPolicy != nil { svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy } fp.OnServiceAdd(svc) // Add initial endpoint slice endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, } for _, ep := range tc.endpoints { endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{ Addresses: []string{ep.ip}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To(ep.hostname), }) } fp.OnEndpointSliceAdd(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice update assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries if tc.expectLocalEntries { assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-LOOP-BACK") } else { assert.Equal(t, 0, activeEntries1.Len(), "Expected no active entry in KUBE-LOOP-BACK") } if tc.expectVirtualServer { virtualServers1, vsErr1 := ipvs.GetVirtualServers() assert.Nil(t, vsErr1, "Expected no error getting virtual servers") assert.Len(t, virtualServers1, 1, "Expected 1 virtual server") realServers1, rsErr1 := ipvs.GetRealServers(virtualServers1[0]) assert.Nil(t, rsErr1, "Expected no error getting real servers") assert.Len(t, realServers1, tc.expectLocalRealServerNum, fmt.Sprintf("Expected %d real servers", tc.expectLocalRealServerNum)) for i := 0; i < tc.expectLocalRealServerNum; i++ { assert.Equal(t, realServers1[i].String(), tc.expectLocalRealServers[i]) } } fp.OnEndpointSliceDelete(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries3 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries assert.Equal(t, 0, activeEntries3.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") virtualServers2, vsErr2 := ipvs.GetVirtualServers() assert.Nil(t, vsErr2, "Expected no error getting virtual servers") assert.Len(t, virtualServers2, 1, "Expected 1 virtual server") realServers2, rsErr2 := ipvs.GetRealServers(virtualServers2[0]) assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 0, "Expected 0 real servers") } } // Test_EndpointSliceReadyAndTerminatingCluster tests that when there are ready and ready + terminating // endpoints and the traffic policy is "Cluster", only the ready endpoints are used. func Test_EndpointSliceReadyAndTerminatingCluster(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) fp.servicesSynced = true // fp.endpointsSynced = true fp.endpointSlicesSynced = true serviceName := "svc1" // Add initial service namespaceName := "ns1" fp.OnServiceAdd(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "172.20.1.1", Selector: map[string]string{"foo": "bar"}, Type: v1.ServiceTypeNodePort, ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyCluster, InternalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster), ExternalIPs: []string{ "1.2.3.4", }, Ports: []v1.ServicePort{ { Name: "", Port: 80, TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP, }, }, }, }) // Add initial endpoint slice endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { Addresses: []string{"10.0.1.1"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.2"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.3"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.4"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.5"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To("another-host"), }, }, } fp.OnEndpointSliceAdd(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice update assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries assert.Equal(t, 4, activeEntries1.Len(), "Expected 4 active entry in KUBE-LOOP-BACK") assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first pod") assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second pod") assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference third pod") assert.Equal(t, true, activeEntries1.Has("10.0.1.4,tcp:80,10.0.1.4"), "Expected activeEntries to reference fourth pod") virtualServers, vsErr := ipvs.GetVirtualServers() assert.Nil(t, vsErr, "Expected no error getting virtual servers") assert.Len(t, virtualServers, 2, "Expected 2 virtual server") var clusterIPServer, externalIPServer *utilipvs.VirtualServer for _, virtualServer := range virtualServers { if virtualServer.Address.String() == "172.20.1.1" { clusterIPServer = virtualServer } if virtualServer.Address.String() == "1.2.3.4" { externalIPServer = virtualServer } } // clusterIP should route to cluster-wide ready endpoints realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer) assert.Nil(t, rsErr1, "Expected no error getting real servers") assert.Len(t, realServers1, 3, "Expected 3 real servers") assert.Equal(t, realServers1[0].String(), "10.0.1.1:80") assert.Equal(t, realServers1[1].String(), "10.0.1.2:80") assert.Equal(t, realServers1[2].String(), "10.0.1.5:80") // externalIP should route to cluster-wide ready endpoints realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer) assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 3, "Expected 3 real servers") assert.Equal(t, realServers2[0].String(), "10.0.1.1:80") assert.Equal(t, realServers2[1].String(), "10.0.1.2:80") assert.Equal(t, realServers1[2].String(), "10.0.1.5:80") fp.OnEndpointSliceDelete(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") virtualServers, vsErr = ipvs.GetVirtualServers() assert.Nil(t, vsErr, "Expected no error getting virtual servers") assert.Len(t, virtualServers, 2, "Expected 2 virtual server") for _, virtualServer := range virtualServers { if virtualServer.Address.String() == "172.20.1.1" { clusterIPServer = virtualServer } if virtualServer.Address.String() == "1.2.3.4" { externalIPServer = virtualServer } } realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer) assert.Nil(t, rsErr1, "Expected no error getting real servers") assert.Len(t, realServers1, 0, "Expected 0 real servers") realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer) assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 0, "Expected 0 real servers") } // Test_EndpointSliceReadyAndTerminatingLocal tests that when there are local ready and ready + terminating // endpoints, only the ready endpoints are used. func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) fp.servicesSynced = true // fp.endpointsSynced = true fp.endpointSlicesSynced = true serviceName := "svc1" // Add initial service namespaceName := "ns1" fp.OnServiceAdd(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "172.20.1.1", Selector: map[string]string{"foo": "bar"}, Type: v1.ServiceTypeNodePort, ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, InternalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster), ExternalIPs: []string{ "1.2.3.4", }, Ports: []v1.ServicePort{ { Name: "", Port: 80, TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP, }, }, }, }) // Add initial endpoint slice endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { Addresses: []string{"10.0.1.1"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.2"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.3"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.4"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.5"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To("another-host"), }, }, } fp.OnEndpointSliceAdd(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice update assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries assert.Equal(t, 4, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK") assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod") assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod") assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod") assert.Equal(t, true, activeEntries1.Has("10.0.1.4,tcp:80,10.0.1.4"), "Expected activeEntries to reference second (local) pod") virtualServers, vsErr := ipvs.GetVirtualServers() assert.Nil(t, vsErr, "Expected no error getting virtual servers") assert.Len(t, virtualServers, 2, "Expected 2 virtual server") var clusterIPServer, externalIPServer *utilipvs.VirtualServer for _, virtualServer := range virtualServers { if virtualServer.Address.String() == "172.20.1.1" { clusterIPServer = virtualServer } if virtualServer.Address.String() == "1.2.3.4" { externalIPServer = virtualServer } } // clusterIP should route to cluster-wide ready endpoints realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer) assert.Nil(t, rsErr1, "Expected no error getting real servers") assert.Len(t, realServers1, 3, "Expected 3 real servers") assert.Equal(t, realServers1[0].String(), "10.0.1.1:80") assert.Equal(t, realServers1[1].String(), "10.0.1.2:80") assert.Equal(t, realServers1[2].String(), "10.0.1.5:80") // externalIP should route to local ready + non-terminating endpoints if they exist realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer) assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 2, "Expected 2 real servers") assert.Equal(t, realServers2[0].String(), "10.0.1.1:80") assert.Equal(t, realServers2[1].String(), "10.0.1.2:80") fp.OnEndpointSliceDelete(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") virtualServers, vsErr = ipvs.GetVirtualServers() assert.Nil(t, vsErr, "Expected no error getting virtual servers") assert.Len(t, virtualServers, 2, "Expected 2 virtual server") for _, virtualServer := range virtualServers { if virtualServer.Address.String() == "172.20.1.1" { clusterIPServer = virtualServer } if virtualServer.Address.String() == "1.2.3.4" { externalIPServer = virtualServer } } realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer) assert.Nil(t, rsErr1, "Expected no error getting real servers") assert.Len(t, realServers1, 0, "Expected 0 real servers") realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer) assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 0, "Expected 0 real servers") } // Test_EndpointSliceOnlyReadyTerminatingCluster tests that when there are only ready terminating // endpoints and the traffic policy is "Cluster", we fall back to terminating endpoints. func Test_EndpointSliceOnlyReadyAndTerminatingCluster(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) fp.servicesSynced = true // fp.endpointsSynced = true fp.endpointSlicesSynced = true // Add initial service serviceName := "svc1" namespaceName := "ns1" fp.OnServiceAdd(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "172.20.1.1", Selector: map[string]string{"foo": "bar"}, Type: v1.ServiceTypeNodePort, ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyCluster, InternalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster), ExternalIPs: []string{ "1.2.3.4", }, Ports: []v1.ServicePort{ { Name: "", Port: 80, TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP, }, }, }, }) // Add initial endpoint slice endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { Addresses: []string{"10.0.1.1"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.2"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.3"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.4"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To("another-host"), }, { Addresses: []string{"10.0.1.5"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(false), }, NodeName: ptr.To("another-host"), }, }, } fp.OnEndpointSliceAdd(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice update assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries assert.Equal(t, 3, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK") assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod") assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod") assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod") virtualServers, vsErr := ipvs.GetVirtualServers() assert.Nil(t, vsErr, "Expected no error getting virtual servers") assert.Len(t, virtualServers, 2, "Expected 2 virtual server") var clusterIPServer, externalIPServer *utilipvs.VirtualServer for _, virtualServer := range virtualServers { if virtualServer.Address.String() == "172.20.1.1" { clusterIPServer = virtualServer } if virtualServer.Address.String() == "1.2.3.4" { externalIPServer = virtualServer } } // clusterIP should fall back to cluster-wide ready + terminating endpoints realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer) assert.Nil(t, rsErr1, "Expected no error getting real servers") assert.Len(t, realServers1, 3, "Expected 1 real servers") assert.Equal(t, realServers1[0].String(), "10.0.1.1:80") assert.Equal(t, realServers1[1].String(), "10.0.1.2:80") assert.Equal(t, realServers1[2].String(), "10.0.1.4:80") // externalIP should fall back to ready + terminating endpoints realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer) assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 3, "Expected 2 real servers") assert.Equal(t, realServers2[0].String(), "10.0.1.1:80") assert.Equal(t, realServers2[1].String(), "10.0.1.2:80") assert.Equal(t, realServers2[2].String(), "10.0.1.4:80") fp.OnEndpointSliceDelete(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") virtualServers, vsErr = ipvs.GetVirtualServers() assert.Nil(t, vsErr, "Expected no error getting virtual servers") assert.Len(t, virtualServers, 2, "Expected 2 virtual server") for _, virtualServer := range virtualServers { if virtualServer.Address.String() == "172.20.1.1" { clusterIPServer = virtualServer } if virtualServer.Address.String() == "1.2.3.4" { externalIPServer = virtualServer } } realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer) assert.Nil(t, rsErr1, "Expected no error getting real servers") assert.Len(t, realServers1, 0, "Expected 0 real servers") realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer) assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 0, "Expected 0 real servers") } // Test_EndpointSliceOnlyReadyTerminatingLocal tests that when there are only local ready terminating // endpoints, we fall back to those endpoints. func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) fp.servicesSynced = true // fp.endpointsSynced = true fp.endpointSlicesSynced = true // Add initial service serviceName := "svc1" namespaceName := "ns1" fp.OnServiceAdd(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "172.20.1.1", Selector: map[string]string{"foo": "bar"}, Type: v1.ServiceTypeNodePort, ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, InternalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster), ExternalIPs: []string{ "1.2.3.4", }, Ports: []v1.ServicePort{ { Name: "", Port: 80, TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP, }, }, }, }) // Add initial endpoint slice endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To(""), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { Addresses: []string{"10.0.1.1"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.2"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.3"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(false), Terminating: ptr.To(true), }, NodeName: ptr.To(testHostname), }, { Addresses: []string{"10.0.1.4"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(false), Serving: ptr.To(true), Terminating: ptr.To(true), }, NodeName: ptr.To("another-host"), }, { Addresses: []string{"10.0.1.5"}, Conditions: discovery.EndpointConditions{ Ready: ptr.To(true), Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: ptr.To("another-host"), }, }, } fp.OnEndpointSliceAdd(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice update assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries assert.Equal(t, 3, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK") assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod") assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod") assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod") virtualServers, vsErr := ipvs.GetVirtualServers() assert.Nil(t, vsErr, "Expected no error getting virtual servers") assert.Len(t, virtualServers, 2, "Expected 2 virtual server") var clusterIPServer, externalIPServer *utilipvs.VirtualServer for _, virtualServer := range virtualServers { if virtualServer.Address.String() == "172.20.1.1" { clusterIPServer = virtualServer } if virtualServer.Address.String() == "1.2.3.4" { externalIPServer = virtualServer } } // clusterIP should route to cluster-wide Ready endpoints realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer) assert.Nil(t, rsErr1, "Expected no error getting real servers") assert.Len(t, realServers1, 1, "Expected 1 real servers") assert.Equal(t, realServers1[0].String(), "10.0.1.5:80") // externalIP should fall back to local ready + terminating endpoints realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer) assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 2, "Expected 2 real servers") assert.Equal(t, realServers2[0].String(), "10.0.1.1:80") assert.Equal(t, realServers2[1].String(), "10.0.1.2:80") fp.OnEndpointSliceDelete(endpointSlice) fp.syncProxyRules() // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") virtualServers, vsErr = ipvs.GetVirtualServers() assert.Nil(t, vsErr, "Expected no error getting virtual servers") assert.Len(t, virtualServers, 2, "Expected 2 virtual server") for _, virtualServer := range virtualServers { if virtualServer.Address.String() == "172.20.1.1" { clusterIPServer = virtualServer } if virtualServer.Address.String() == "1.2.3.4" { externalIPServer = virtualServer } } realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer) assert.Nil(t, rsErr1, "Expected no error getting real servers") assert.Len(t, realServers1, 0, "Expected 0 real servers") realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer) assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 0, "Expected 0 real servers") } func TestIpIsValidForSet(t *testing.T) { testCases := []struct { isIPv6 bool ip string res bool }{ { false, "127.0.0.1", false, }, { false, "127.0.0.0", false, }, { false, "127.6.7.8", false, }, { false, "8.8.8.8", true, }, { false, "192.168.0.1", true, }, { false, "169.254.0.0", true, }, { false, "::ffff:169.254.0.0", // IPv6 mapped IPv4 true, }, { false, "1000::", false, }, // IPv6 { true, "::1", false, }, { true, "1000::", true, }, { true, "fe80::200:ff:fe01:1", false, }, { true, "8.8.8.8", false, }, { true, "::ffff:8.8.8.8", false, }, } for _, tc := range testCases { v := &netlinkHandle{} v.isIPv6 = tc.isIPv6 ip := netutils.ParseIPSloppy(tc.ip) if ip == nil { t.Errorf("Parse error: %s", tc.ip) } if v.isValidForSet(ip) != tc.res { if tc.isIPv6 { t.Errorf("IPv6: %s", tc.ip) } else { t.Errorf("IPv4: %s", tc.ip) } } } } func TestNoEndpointsMetric(t *testing.T) { type endpoint struct { ip string hostname string } metrics.RegisterMetrics() testCases := []struct { name string internalTrafficPolicy *v1.ServiceInternalTrafficPolicy externalTrafficPolicy v1.ServiceExternalTrafficPolicy endpoints []endpoint expectedSyncProxyRulesNoLocalEndpointsTotalInternal int expectedSyncProxyRulesNoLocalEndpointsTotalExternal int }{ { name: "internalTrafficPolicy is set and there are local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), endpoints: []endpoint{ {"10.0.1.1", testHostname}, {"10.0.1.2", "host1"}, {"10.0.1.3", "host2"}, }, }, { name: "externalTrafficPolicy is set and there are local endpoints", externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, endpoints: []endpoint{ {"10.0.1.1", testHostname}, {"10.0.1.2", "host1"}, {"10.0.1.3", "host2"}, }, }, { name: "both policies are set and there are local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, endpoints: []endpoint{ {"10.0.1.1", testHostname}, {"10.0.1.2", "host1"}, {"10.0.1.3", "host2"}, }, }, { name: "internalTrafficPolicy is set and there are no local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), endpoints: []endpoint{ {"10.0.1.1", "host0"}, {"10.0.1.2", "host1"}, {"10.0.1.3", "host2"}, }, expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, }, { name: "externalTrafficPolicy is set and there are no local endpoints", externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, endpoints: []endpoint{ {"10.0.1.1", "host0"}, {"10.0.1.2", "host1"}, {"10.0.1.3", "host2"}, }, expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, }, { name: "Both policies are set and there are no local endpoints", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, endpoints: []endpoint{ {"10.0.1.1", "host0"}, {"10.0.1.2", "host1"}, {"10.0.1.3", "host2"}, }, expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, }, { name: "Both policies are set and there are no endpoints at all", internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal), externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, endpoints: []endpoint{}, expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 0, expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 0, }, } for _, tc := range testCases { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, []string{"10.0.0.1"}, nil, v1.IPv4Protocol) fp.servicesSynced = true // fp.endpointsSynced = true fp.endpointSlicesSynced = true // Add initial service serviceName := "svc1" namespaceName := "ns1" svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ ClusterIP: "172.20.1.1", Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Name: "p80", Port: 80, TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP, NodePort: 30000}}, }, } if tc.internalTrafficPolicy != nil { svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy } if tc.externalTrafficPolicy != "" { svc.Spec.Type = v1.ServiceTypeNodePort svc.Spec.ExternalTrafficPolicy = tc.externalTrafficPolicy } fp.OnServiceAdd(svc) // Add initial endpoint slice endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-1", serviceName), Namespace: namespaceName, Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }}, AddressType: discovery.AddressTypeIPv4, } for _, ep := range tc.endpoints { endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{ Addresses: []string{ep.ip}, Conditions: discovery.EndpointConditions{Ready: ptr.To(true)}, NodeName: ptr.To(ep.hostname), }) } fp.OnEndpointSliceAdd(endpointSlice) fp.syncProxyRules() syncProxyRulesNoLocalEndpointsTotalInternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal")) if err != nil { t.Errorf("failed to get %s value(internal), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) } if tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal != int(syncProxyRulesNoLocalEndpointsTotalInternal) { t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalInternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal) } syncProxyRulesNoLocalEndpointsTotalExternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external")) if err != nil { t.Errorf("failed to get %s value(external), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) } if tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal != int(syncProxyRulesNoLocalEndpointsTotalExternal) { t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(external): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal) } } } func TestDismissLocalhostRuleExist(t *testing.T) { tests := []struct { name string ipFamily v1.IPFamily src string }{ { name: "ipv4 rule", ipFamily: v1.IPv4Protocol, src: "127.0.0.0/8", }, { name: "ipv6 rule", ipFamily: v1.IPv6Protocol, src: "::1/128", }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { ipt := iptablestest.NewFake() if test.ipFamily == v1.IPv6Protocol { ipt = iptablestest.NewIPv6Fake() } ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, test.ipFamily) fp.syncProxyRules() rules := getRules(ipt, kubeServicesChain) if len(rules) <= 0 { t.Errorf("skip loop back ip in kubeservice chain not exist") return } if !rules[0].Jump.Matches("RETURN") || !rules[0].SourceAddress.Matches(test.src) { t.Errorf("rules not match, expect jump: %s, got: %s; expect source address: %s, got: %s", "RETURN", rules[0].Jump.String(), test.src, rules[0].SourceAddress.String()) } }) } } func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) { testCases := []struct { name string ipModeEnabled bool svcIP string svcLBIP string ipMode *v1.LoadBalancerIPMode expectedServices int }{ /* LoadBalancerIPMode disabled */ { name: "LoadBalancerIPMode disabled, ipMode Proxy", ipModeEnabled: false, svcIP: "10.20.30.41", svcLBIP: "1.2.3.4", ipMode: ptr.To(v1.LoadBalancerIPModeProxy), expectedServices: 2, }, { name: "LoadBalancerIPMode disabled, ipMode VIP", ipModeEnabled: false, svcIP: "10.20.30.42", svcLBIP: "1.2.3.5", ipMode: ptr.To(v1.LoadBalancerIPModeVIP), expectedServices: 2, }, { name: "LoadBalancerIPMode disabled, ipMode nil", ipModeEnabled: false, svcIP: "10.20.30.43", svcLBIP: "1.2.3.6", ipMode: nil, expectedServices: 2, }, /* LoadBalancerIPMode enabled */ { name: "LoadBalancerIPMode enabled, ipMode Proxy", ipModeEnabled: true, svcIP: "10.20.30.41", svcLBIP: "1.2.3.4", ipMode: ptr.To(v1.LoadBalancerIPModeProxy), expectedServices: 1, }, { name: "LoadBalancerIPMode enabled, ipMode VIP", ipModeEnabled: true, svcIP: "10.20.30.42", svcLBIP: "1.2.3.5", ipMode: ptr.To(v1.LoadBalancerIPModeVIP), expectedServices: 2, }, { name: "LoadBalancerIPMode enabled, ipMode nil", ipModeEnabled: true, svcIP: "10.20.30.43", svcLBIP: "1.2.3.6", ipMode: nil, expectedServices: 2, }, } svcPort := 80 svcNodePort := 3001 svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, testCase.ipModeEnabled)() _, fp := buildFakeProxier() makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = testCase.svcIP svc.Spec.Ports = []v1.ServicePort{{ Name: svcPortName.Port, Port: int32(svcPort), Protocol: v1.ProtocolTCP, NodePort: int32(svcNodePort), }} svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ IP: testCase.svcLBIP, IPMode: testCase.ipMode, }} }), ) makeEndpointSliceMap(fp, makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"10.180.0.1"}, }} eps.Ports = []discovery.EndpointPort{{ Name: ptr.To("p80"), Port: ptr.To[int32](80), Protocol: ptr.To(v1.ProtocolTCP), }} }), ) fp.syncProxyRules() services, err := fp.ipvs.GetVirtualServers() if err != nil { t.Errorf("Failed to get ipvs services, err: %v", err) } if len(services) != testCase.expectedServices { t.Errorf("Expected %d ipvs services, got %d", testCase.expectedServices, len(services)) } }) } }