1
2
3
4
19
20 package ipvs
21
22 import (
23 "bytes"
24 "fmt"
25 "net"
26 "reflect"
27 "sort"
28 "strings"
29 "testing"
30 "time"
31
32 "github.com/stretchr/testify/assert"
33 v1 "k8s.io/api/core/v1"
34 discovery "k8s.io/api/discovery/v1"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/apimachinery/pkg/types"
37 "k8s.io/apimachinery/pkg/util/intstr"
38 "k8s.io/apimachinery/pkg/util/sets"
39 utilfeature "k8s.io/apiserver/pkg/util/feature"
40 featuregatetesting "k8s.io/component-base/featuregate/testing"
41 "k8s.io/component-base/metrics/testutil"
42 "k8s.io/kubernetes/pkg/features"
43 "k8s.io/kubernetes/pkg/proxy"
44 "k8s.io/kubernetes/pkg/proxy/conntrack"
45 "k8s.io/kubernetes/pkg/proxy/healthcheck"
46 utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset"
47 ipsettest "k8s.io/kubernetes/pkg/proxy/ipvs/ipset/testing"
48 netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
49 utilipvs "k8s.io/kubernetes/pkg/proxy/ipvs/util"
50 ipvstest "k8s.io/kubernetes/pkg/proxy/ipvs/util/testing"
51 "k8s.io/kubernetes/pkg/proxy/metrics"
52 proxyutil "k8s.io/kubernetes/pkg/proxy/util"
53 proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
54 proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
55 "k8s.io/kubernetes/pkg/util/async"
56 utiliptables "k8s.io/kubernetes/pkg/util/iptables"
57 iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
58 netutils "k8s.io/utils/net"
59 "k8s.io/utils/ptr"
60 )
61
62 const testHostname = "test-hostname"
63
64
65 type fakeIpvs struct {
66 ipvsErr string
67 vsCreated bool
68 }
69
70 func (f *fakeIpvs) Flush() error {
71 return nil
72 }
73 func (f *fakeIpvs) AddVirtualServer(*utilipvs.VirtualServer) error {
74 if f.ipvsErr == "AddVirtualServer" {
75 return fmt.Errorf("oops")
76 }
77 f.vsCreated = true
78 return nil
79 }
80 func (f *fakeIpvs) UpdateVirtualServer(*utilipvs.VirtualServer) error {
81 return nil
82 }
83 func (f *fakeIpvs) DeleteVirtualServer(*utilipvs.VirtualServer) error {
84 if f.ipvsErr == "DeleteVirtualServer" {
85 return fmt.Errorf("oops")
86 }
87 return nil
88 }
89 func (f *fakeIpvs) GetVirtualServer(*utilipvs.VirtualServer) (*utilipvs.VirtualServer, error) {
90 return nil, nil
91 }
92 func (f *fakeIpvs) GetVirtualServers() ([]*utilipvs.VirtualServer, error) {
93 if f.ipvsErr == "GetVirtualServers" {
94 return nil, fmt.Errorf("oops")
95 }
96 if f.vsCreated {
97 vs := []*utilipvs.VirtualServer{{}}
98 return vs, nil
99 }
100 return nil, nil
101 }
102 func (f *fakeIpvs) AddRealServer(*utilipvs.VirtualServer, *utilipvs.RealServer) error {
103 return nil
104 }
105 func (f *fakeIpvs) GetRealServers(*utilipvs.VirtualServer) ([]*utilipvs.RealServer, error) {
106 return nil, nil
107 }
108 func (f *fakeIpvs) DeleteRealServer(*utilipvs.VirtualServer, *utilipvs.RealServer) error {
109 return nil
110 }
111 func (f *fakeIpvs) UpdateRealServer(*utilipvs.VirtualServer, *utilipvs.RealServer) error {
112 return nil
113 }
114 func (f *fakeIpvs) ConfigureTimeouts(time.Duration, time.Duration, time.Duration) error {
115 return nil
116 }
117
118
119 type fakeIPSetVersioner struct {
120 version string
121 err error
122 }
123
124 func (fake *fakeIPSetVersioner) GetVersion() (string, error) {
125 return fake.version, fake.err
126 }
127
128 func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []string, excludeCIDRs []*net.IPNet, ipFamily v1.IPFamily) *Proxier {
129
130 netlinkHandle := netlinktest.NewFakeNetlinkHandle(ipFamily == v1.IPv6Protocol)
131 netlinkHandle.SetLocalAddresses("eth0", nodeIPs...)
132
133
134 ipsetList := make(map[string]*IPSet)
135 for _, is := range ipsetInfo {
136 ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment)
137 }
138 p := &Proxier{
139 svcPortMap: make(proxy.ServicePortMap),
140 serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
141 endpointsMap: make(proxy.EndpointsMap),
142 endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, nil, ipFamily, nil, nil),
143 excludeCIDRs: excludeCIDRs,
144 iptables: ipt,
145 ipvs: ipvs,
146 ipset: ipset,
147 conntrack: conntrack.NewFake(),
148 strictARP: false,
149 localDetector: proxyutiliptables.NewNoOpLocalDetector(),
150 hostname: testHostname,
151 serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
152 ipvsScheduler: defaultScheduler,
153 iptablesData: bytes.NewBuffer(nil),
154 filterChainsData: bytes.NewBuffer(nil),
155 natChains: proxyutil.NewLineBuffer(),
156 natRules: proxyutil.NewLineBuffer(),
157 filterChains: proxyutil.NewLineBuffer(),
158 filterRules: proxyutil.NewLineBuffer(),
159 netlinkHandle: netlinkHandle,
160 ipsetList: ipsetList,
161 nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil, nil),
162 networkInterfacer: proxyutiltest.NewFakeNetwork(),
163 gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
164 ipFamily: ipFamily,
165 }
166 p.setInitialized(true)
167 p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
168 return p
169 }
170
171 func makeNSN(namespace, name string) types.NamespacedName {
172 return types.NamespacedName{Namespace: namespace, Name: name}
173 }
174
175 func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
176 for i := range allServices {
177 proxier.OnServiceAdd(allServices[i])
178 }
179
180 proxier.mu.Lock()
181 defer proxier.mu.Unlock()
182 proxier.servicesSynced = true
183 }
184
185 func makeEndpointSliceMap(proxier *Proxier, allEpSlices ...*discovery.EndpointSlice) {
186 for i := range allEpSlices {
187 proxier.OnEndpointSliceAdd(allEpSlices[i])
188 }
189 proxier.mu.Lock()
190 defer proxier.mu.Unlock()
191 proxier.endpointSlicesSynced = true
192 }
193
194 func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
195 svc := &v1.Service{
196 ObjectMeta: metav1.ObjectMeta{
197 Name: name,
198 Namespace: namespace,
199 Annotations: map[string]string{},
200 },
201 Spec: v1.ServiceSpec{},
202 Status: v1.ServiceStatus{},
203 }
204 svcFunc(svc)
205 return svc
206 }
207
208 func populateEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.EndpointSlice) {
209 for i := range allEndpointSlices {
210 proxier.OnEndpointSliceAdd(allEndpointSlices[i])
211 }
212 }
213
214 func makeTestEndpointSlice(namespace, name string, sliceNum int, epsFunc func(*discovery.EndpointSlice)) *discovery.EndpointSlice {
215 eps := &discovery.EndpointSlice{
216 ObjectMeta: metav1.ObjectMeta{
217 Name: fmt.Sprintf("%s-%d", name, sliceNum),
218 Namespace: namespace,
219 Labels: map[string]string{discovery.LabelServiceName: name},
220 },
221 }
222 epsFunc(eps)
223 return eps
224 }
225
226 func TestCleanupLeftovers(t *testing.T) {
227 ipt := iptablestest.NewFake()
228 ipvs := ipvstest.NewFake()
229 ipset := ipsettest.NewFake(testIPSetVersion)
230 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
231 svcIP := "10.20.30.41"
232 svcPort := 80
233 svcNodePort := 3001
234 svcPortName := proxy.ServicePortName{
235 NamespacedName: makeNSN("ns1", "svc1"),
236 Port: "p80",
237 }
238
239 makeServiceMap(fp,
240 makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
241 svc.Spec.Type = "NodePort"
242 svc.Spec.ClusterIP = svcIP
243 svc.Spec.Ports = []v1.ServicePort{{
244 Name: svcPortName.Port,
245 Port: int32(svcPort),
246 Protocol: v1.ProtocolTCP,
247 NodePort: int32(svcNodePort),
248 }}
249 }),
250 )
251 epIP := "10.180.0.1"
252 populateEndpointSlices(fp,
253 makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
254 eps.AddressType = discovery.AddressTypeIPv4
255 eps.Endpoints = []discovery.Endpoint{{
256 Addresses: []string{epIP},
257 }}
258 eps.Ports = []discovery.EndpointPort{{
259 Name: ptr.To(svcPortName.Port),
260 Port: ptr.To(int32(svcPort)),
261 Protocol: ptr.To(v1.ProtocolTCP),
262 }}
263 }),
264 )
265
266 fp.syncProxyRules()
267
268
269 if CleanupLeftovers(ipvs, ipt, ipset) {
270 t.Errorf("Cleanup leftovers failed")
271 }
272 }
273
274 func TestCanUseIPVSProxier(t *testing.T) {
275 testCases := []struct {
276 name string
277 scheduler string
278 ipsetVersion string
279 ipsetErr error
280 ipvsErr string
281 ok bool
282 }{
283 {
284 name: "happy days",
285 ipsetVersion: MinIPSetCheckVersion,
286 ok: true,
287 },
288 {
289 name: "ipset error",
290 scheduler: "",
291 ipsetVersion: MinIPSetCheckVersion,
292 ipsetErr: fmt.Errorf("oops"),
293 ok: false,
294 },
295 {
296 name: "ipset version too low",
297 scheduler: "rr",
298 ipsetVersion: "4.3.0",
299 ok: false,
300 },
301 {
302 name: "GetVirtualServers fail",
303 ipsetVersion: MinIPSetCheckVersion,
304 ipvsErr: "GetVirtualServers",
305 ok: false,
306 },
307 {
308 name: "AddVirtualServer fail",
309 ipsetVersion: MinIPSetCheckVersion,
310 ipvsErr: "AddVirtualServer",
311 ok: false,
312 },
313 {
314 name: "DeleteVirtualServer fail",
315 ipsetVersion: MinIPSetCheckVersion,
316 ipvsErr: "DeleteVirtualServer",
317 ok: false,
318 },
319 }
320
321 for _, tc := range testCases {
322 ipvs := &fakeIpvs{tc.ipvsErr, false}
323 versioner := &fakeIPSetVersioner{version: tc.ipsetVersion, err: tc.ipsetErr}
324 err := CanUseIPVSProxier(ipvs, versioner, tc.scheduler)
325 if (err == nil) != tc.ok {
326 t.Errorf("Case [%s], expect %v, got err: %v", tc.name, tc.ok, err)
327 }
328 }
329 }
330
331 func TestGetNodeIPs(t *testing.T) {
332 testCases := []struct {
333 isIPv6 bool
334 devAddresses map[string][]string
335 expectIPs []string
336 }{
337
338 {
339 devAddresses: map[string][]string{"eth0": {"1.2.3.4"}, "lo": {"127.0.0.1"}},
340 expectIPs: []string{"1.2.3.4"},
341 },
342
343 {
344 devAddresses: map[string][]string{"lo": {"127.0.0.1"}},
345 expectIPs: []string{},
346 },
347
348 {
349 devAddresses: map[string][]string{},
350 expectIPs: []string{},
351 },
352
353 {
354 devAddresses: map[string][]string{"encap0": {"10.20.30.40", "fe80::200:ff:fe01:1"}, "lo": {"127.0.0.1", "::1"}, "docker0": {"172.17.0.1"}},
355 expectIPs: []string{"10.20.30.40", "172.17.0.1"},
356 },
357
358 {
359 devAddresses: map[string][]string{"encaps9": {"10.20.30.40"}, "lo": {"127.0.0.1", "::1"}, "encap7": {"1000::", "10.20.30.31"}},
360 expectIPs: []string{"10.20.30.40", "10.20.30.31"},
361 },
362
363 {
364 devAddresses: map[string][]string{"kube-ipvs0": {"2000::", "1.2.3.4"}, "lo": {"127.0.0.1", "::1"}, "encap7": {"1000::", "10.20.30.31"}},
365 expectIPs: []string{"10.20.30.31"},
366 },
367
368 {
369 devAddresses: map[string][]string{"kube-ipvs0": {"1.2.3.4", "2.3.4.5"}, "lo": {"127.0.0.1", "::1"}},
370 expectIPs: []string{},
371 },
372
373 {
374 devAddresses: map[string][]string{"kube-ipvs0": {"1.2.3.4", "2.3.4.5"}},
375 expectIPs: []string{},
376 },
377
378 {
379 devAddresses: map[string][]string{"kube-ipvs0": {"1.2.3.4", "2.3.4.5"}, "eth5": {"3.4.5.6"}, "lo": {"127.0.0.1", "::1"}},
380 expectIPs: []string{"3.4.5.6"},
381 },
382
383 {
384 devAddresses: map[string][]string{"ipvs0": {"1.2.3.4"}, "lo": {"127.0.0.1", "::1"}, "encap7": {"10.20.30.31"}},
385 expectIPs: []string{"10.20.30.31", "1.2.3.4"},
386 },
387
388 {
389 isIPv6: true,
390 devAddresses: map[string][]string{"ipvs0": {"1.2.3.4", "1000::"}, "lo": {"127.0.0.1", "::1"}, "encap7": {"10.20.30.31", "2000::", "fe80::200:ff:fe01:1"}},
391 expectIPs: []string{"1000::", "2000::"},
392 },
393
394 {
395 isIPv6: true,
396 devAddresses: map[string][]string{"ipvs0": {"1.2.3.4", "1000::"}, "lo": {"127.0.0.1", "::1"}, "encap7": {"10.20.30.31", "2000::", "fe80::200:ff:fe01:1"}, "kube-ipvs0": {"1.2.3.4", "2.3.4.5", "2000::"}},
397 expectIPs: []string{"1000::"},
398 },
399 }
400
401 for i, tc := range testCases {
402 fake := netlinktest.NewFakeNetlinkHandle(tc.isIPv6)
403 for dev, addresses := range testCases[i].devAddresses {
404 fake.SetLocalAddresses(dev, addresses...)
405 }
406 ips, err := fake.GetAllLocalAddresses()
407 if err != nil {
408 t.Errorf("Unexpected error: %v", err)
409 }
410 devIps, err := fake.GetLocalAddresses("kube-ipvs0")
411 if err != nil {
412 t.Errorf("Unexpected error: %v", err)
413 }
414 ips = ips.Difference(devIps)
415 if !ips.Equal(sets.New(tc.expectIPs...)) {
416 t.Errorf("case[%d], unexpected mismatch, expected: %v, got: %v", i, tc.expectIPs, ips)
417 }
418 }
419 }
420
421 func TestNodePortIPv4(t *testing.T) {
422 tests := []struct {
423 name string
424 services []*v1.Service
425 endpoints []*discovery.EndpointSlice
426 nodeIPs []string
427 nodePortAddresses []string
428 expectedIPVS *ipvstest.FakeIPVS
429 expectedIPSets netlinktest.ExpectedIPSet
430 expectedIptablesChains netlinktest.ExpectedIptablesChain
431 }{
432 {
433 name: "1 service with node port, has 2 endpoints",
434 services: []*v1.Service{
435 makeTestService("ns1", "svc1", func(svc *v1.Service) {
436 svc.Spec.Type = "NodePort"
437 svc.Spec.ClusterIP = "10.20.30.41"
438 svc.Spec.Ports = []v1.ServicePort{{
439 Name: "p80",
440 Port: int32(80),
441 Protocol: v1.ProtocolTCP,
442 NodePort: int32(3001),
443 }}
444 }),
445 },
446 endpoints: []*discovery.EndpointSlice{
447 makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
448 eps.AddressType = discovery.AddressTypeIPv4
449 eps.Endpoints = []discovery.Endpoint{{
450 Addresses: []string{"10.180.0.1"},
451 }}
452 eps.Ports = []discovery.EndpointPort{{
453 Name: ptr.To("p80"),
454 Port: ptr.To[int32](80),
455 Protocol: ptr.To(v1.ProtocolTCP),
456 }}
457 }),
458 makeTestEndpointSlice("ns1", "svc1", 2, func(eps *discovery.EndpointSlice) {
459 eps.AddressType = discovery.AddressTypeIPv6
460 eps.Endpoints = []discovery.Endpoint{{
461 Addresses: []string{"1002:ab8::2:10"},
462 }}
463 eps.Ports = []discovery.EndpointPort{{
464 Name: ptr.To("p80"),
465 Port: ptr.To[int32](80),
466 Protocol: ptr.To(v1.ProtocolTCP),
467 }}
468 }),
469 },
470 nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"},
471 nodePortAddresses: []string{},
472 expectedIPVS: &ipvstest.FakeIPVS{
473 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
474 {
475 IP: "10.20.30.41",
476 Port: 80,
477 Protocol: "TCP",
478 }: {
479 Address: netutils.ParseIPSloppy("10.20.30.41"),
480 Protocol: "TCP",
481 Port: uint16(80),
482 Scheduler: "rr",
483 },
484 {
485 IP: "100.101.102.103",
486 Port: 3001,
487 Protocol: "TCP",
488 }: {
489 Address: netutils.ParseIPSloppy("100.101.102.103"),
490 Protocol: "TCP",
491 Port: uint16(3001),
492 Scheduler: "rr",
493 },
494 },
495 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
496 {
497 IP: "10.20.30.41",
498 Port: 80,
499 Protocol: "TCP",
500 }: {
501 {
502 Address: netutils.ParseIPSloppy("10.180.0.1"),
503 Port: uint16(80),
504 Weight: 1,
505 },
506 },
507 {
508 IP: "100.101.102.103",
509 Port: 3001,
510 Protocol: "TCP",
511 }: {
512 {
513 Address: netutils.ParseIPSloppy("10.180.0.1"),
514 Port: uint16(80),
515 Weight: 1,
516 },
517 },
518 },
519 },
520 },
521 {
522 name: "1 UDP service with node port, has endpoints",
523 services: []*v1.Service{
524 makeTestService("ns1", "svc1", func(svc *v1.Service) {
525 svc.Spec.Type = "NodePort"
526 svc.Spec.ClusterIP = "10.20.30.41"
527 svc.Spec.Ports = []v1.ServicePort{{
528 Name: "p80",
529 Port: int32(80),
530 Protocol: v1.ProtocolUDP,
531 NodePort: int32(3001),
532 }}
533 }),
534 },
535 endpoints: []*discovery.EndpointSlice{
536 makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
537 eps.AddressType = discovery.AddressTypeIPv4
538 eps.Endpoints = []discovery.Endpoint{{
539 Addresses: []string{"10.180.0.1"},
540 }}
541 eps.Ports = []discovery.EndpointPort{{
542 Name: ptr.To("p80"),
543 Port: ptr.To[int32](80),
544 Protocol: ptr.To(v1.ProtocolUDP),
545 }}
546 }),
547 },
548 nodeIPs: []string{"100.101.102.103"},
549 nodePortAddresses: []string{"0.0.0.0/0"},
550 expectedIPVS: &ipvstest.FakeIPVS{
551 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
552 {
553 IP: "10.20.30.41",
554 Port: 80,
555 Protocol: "UDP",
556 }: {
557 Address: netutils.ParseIPSloppy("10.20.30.41"),
558 Protocol: "UDP",
559 Port: uint16(80),
560 Scheduler: "rr",
561 },
562 {
563 IP: "100.101.102.103",
564 Port: 3001,
565 Protocol: "UDP",
566 }: {
567 Address: netutils.ParseIPSloppy("100.101.102.103"),
568 Protocol: "UDP",
569 Port: uint16(3001),
570 Scheduler: "rr",
571 },
572 },
573 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
574 {
575 IP: "10.20.30.41",
576 Port: 80,
577 Protocol: "UDP",
578 }: {
579 {
580 Address: netutils.ParseIPSloppy("10.180.0.1"),
581 Port: uint16(80),
582 Weight: 1,
583 },
584 },
585 {
586 IP: "100.101.102.103",
587 Port: 3001,
588 Protocol: "UDP",
589 }: {
590 {
591 Address: netutils.ParseIPSloppy("10.180.0.1"),
592 Port: uint16(80),
593 Weight: 1,
594 },
595 },
596 },
597 },
598 expectedIPSets: netlinktest.ExpectedIPSet{
599 kubeNodePortSetUDP: {{
600 Port: 3001,
601 Protocol: strings.ToLower(string(v1.ProtocolUDP)),
602 SetType: utilipset.BitmapPort,
603 }},
604 },
605 expectedIptablesChains: netlinktest.ExpectedIptablesChain{
606 string(kubeNodePortChain): {{
607 JumpChain: string(kubeMarkMasqChain), MatchSet: kubeNodePortSetUDP,
608 }, {
609 JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
610 }},
611 string(kubeServicesChain): {{
612 JumpChain: "RETURN", SourceAddress: "127.0.0.0/8",
613 }, {
614 JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet,
615 }, {
616 JumpChain: string(kubeNodePortChain), MatchSet: "",
617 }, {
618 JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
619 }},
620 },
621 },
622 {
623 name: "service has node port but no endpoints",
624 services: []*v1.Service{
625 makeTestService("ns1", "svc1", func(svc *v1.Service) {
626 svc.Spec.Type = "NodePort"
627 svc.Spec.ClusterIP = "10.20.30.41"
628 svc.Spec.Ports = []v1.ServicePort{{
629 Name: "p80",
630 Port: int32(80),
631 Protocol: v1.ProtocolTCP,
632 NodePort: int32(3001),
633 }}
634 }),
635 },
636 endpoints: []*discovery.EndpointSlice{},
637 nodeIPs: []string{"100.101.102.103"},
638 nodePortAddresses: []string{},
639 expectedIPVS: &ipvstest.FakeIPVS{
640 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
641 {
642 IP: "10.20.30.41",
643 Port: 80,
644 Protocol: "TCP",
645 }: {
646 Address: netutils.ParseIPSloppy("10.20.30.41"),
647 Protocol: "TCP",
648 Port: uint16(80),
649 Scheduler: "rr",
650 },
651 {
652 IP: "100.101.102.103",
653 Port: 3001,
654 Protocol: "TCP",
655 }: {
656 Address: netutils.ParseIPSloppy("100.101.102.103"),
657 Protocol: "TCP",
658 Port: uint16(3001),
659 Scheduler: "rr",
660 },
661 },
662 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
663 {
664 IP: "10.20.30.41",
665 Port: 80,
666 Protocol: "TCP",
667 }: {},
668 {
669 IP: "100.101.102.103",
670 Port: 3001,
671 Protocol: "TCP",
672 }: {},
673 },
674 },
675 },
676 {
677 name: "node port service with protocol sctp on a node with multiple nodeIPs",
678 services: []*v1.Service{
679 makeTestService("ns1", "svc1", func(svc *v1.Service) {
680 svc.Spec.Type = "NodePort"
681 svc.Spec.ClusterIP = "10.20.30.41"
682 svc.Spec.Ports = []v1.ServicePort{{
683 Name: "p80",
684 Port: int32(80),
685 Protocol: v1.ProtocolSCTP,
686 NodePort: int32(3001),
687 }}
688 }),
689 },
690 endpoints: []*discovery.EndpointSlice{
691 makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
692 eps.AddressType = discovery.AddressTypeIPv4
693 eps.Endpoints = []discovery.Endpoint{{
694 Addresses: []string{"10.180.0.1"},
695 }}
696 eps.Ports = []discovery.EndpointPort{{
697 Name: ptr.To("p80"),
698 Port: ptr.To[int32](80),
699 Protocol: ptr.To(v1.ProtocolSCTP),
700 }}
701 }),
702 },
703 nodeIPs: []string{
704 "100.101.102.103",
705 "100.101.102.104",
706 "100.101.102.105",
707 "2001:db8::1:1",
708 "2001:db8::1:2",
709 "2001:db8::1:3",
710 },
711 nodePortAddresses: []string{},
712 expectedIPVS: &ipvstest.FakeIPVS{
713 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
714 {
715 IP: "10.20.30.41",
716 Port: 80,
717 Protocol: "SCTP",
718 }: {
719 Address: netutils.ParseIPSloppy("10.20.30.41"),
720 Protocol: "SCTP",
721 Port: uint16(80),
722 Scheduler: "rr",
723 },
724 {
725 IP: "100.101.102.103",
726 Port: 3001,
727 Protocol: "SCTP",
728 }: {
729 Address: netutils.ParseIPSloppy("100.101.102.103"),
730 Protocol: "SCTP",
731 Port: uint16(3001),
732 Scheduler: "rr",
733 },
734 {
735 IP: "100.101.102.104",
736 Port: 3001,
737 Protocol: "SCTP",
738 }: {
739 Address: netutils.ParseIPSloppy("100.101.102.104"),
740 Protocol: "SCTP",
741 Port: uint16(3001),
742 Scheduler: "rr",
743 },
744 {
745 IP: "100.101.102.105",
746 Port: 3001,
747 Protocol: "SCTP",
748 }: {
749 Address: netutils.ParseIPSloppy("100.101.102.105"),
750 Protocol: "SCTP",
751 Port: uint16(3001),
752 Scheduler: "rr",
753 },
754 },
755 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
756 {
757 IP: "10.20.30.41",
758 Port: 80,
759 Protocol: "SCTP",
760 }: {
761 {
762 Address: netutils.ParseIPSloppy("10.180.0.1"),
763 Port: uint16(80),
764 Weight: 1,
765 },
766 },
767 {
768 IP: "100.101.102.103",
769 Port: 3001,
770 Protocol: "SCTP",
771 }: {
772 {
773 Address: netutils.ParseIPSloppy("10.180.0.1"),
774 Port: uint16(80),
775 Weight: 1,
776 },
777 },
778 {
779 IP: "100.101.102.104",
780 Port: 3001,
781 Protocol: "SCTP",
782 }: {
783 {
784 Address: netutils.ParseIPSloppy("10.180.0.1"),
785 Port: uint16(80),
786 Weight: 1,
787 },
788 },
789 {
790 IP: "100.101.102.105",
791 Port: 3001,
792 Protocol: "SCTP",
793 }: {
794 {
795 Address: netutils.ParseIPSloppy("10.180.0.1"),
796 Port: uint16(80),
797 Weight: 1,
798 },
799 },
800 },
801 },
802 expectedIPSets: netlinktest.ExpectedIPSet{
803 kubeNodePortSetSCTP: {
804 {
805 IP: "100.101.102.103",
806 Port: 3001,
807 Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
808 SetType: utilipset.HashIPPort,
809 },
810 {
811 IP: "100.101.102.104",
812 Port: 3001,
813 Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
814 SetType: utilipset.HashIPPort,
815 },
816 {
817 IP: "100.101.102.105",
818 Port: 3001,
819 Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
820 SetType: utilipset.HashIPPort,
821 },
822 },
823 },
824 },
825 {
826 name: "node port service with protocol sctp and externalTrafficPolicy local",
827 services: []*v1.Service{
828 makeTestService("ns1", "svc1", func(svc *v1.Service) {
829 svc.Spec.Type = "NodePort"
830 svc.Spec.ClusterIP = "10.20.30.41"
831 svc.Spec.Ports = []v1.ServicePort{{
832 Name: "p80",
833 Port: int32(80),
834 Protocol: v1.ProtocolSCTP,
835 NodePort: int32(3001),
836 }}
837 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
838 }),
839 },
840 endpoints: []*discovery.EndpointSlice{
841 makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
842 eps.AddressType = discovery.AddressTypeIPv4
843 eps.Endpoints = []discovery.Endpoint{{
844 Addresses: []string{"10.180.0.1"},
845 NodeName: ptr.To(testHostname),
846 }, {
847 Addresses: []string{"10.180.1.1"},
848 NodeName: ptr.To("otherHost"),
849 }}
850 eps.Ports = []discovery.EndpointPort{{
851 Name: ptr.To("p80"),
852 Port: ptr.To[int32](80),
853 Protocol: ptr.To(v1.ProtocolSCTP),
854 }}
855 }),
856 },
857 nodeIPs: []string{"100.101.102.103"},
858 nodePortAddresses: []string{},
859 expectedIPVS: &ipvstest.FakeIPVS{
860 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
861 {
862 IP: "10.20.30.41",
863 Port: 80,
864 Protocol: "SCTP",
865 }: {
866 Address: netutils.ParseIPSloppy("10.20.30.41"),
867 Protocol: "SCTP",
868 Port: uint16(80),
869 Scheduler: "rr",
870 },
871 {
872 IP: "100.101.102.103",
873 Port: 3001,
874 Protocol: "SCTP",
875 }: {
876 Address: netutils.ParseIPSloppy("100.101.102.103"),
877 Protocol: "SCTP",
878 Port: uint16(3001),
879 Scheduler: "rr",
880 },
881 },
882 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
883 {
884 IP: "10.20.30.41",
885 Port: 80,
886 Protocol: "SCTP",
887 }: {
888 {
889 Address: netutils.ParseIPSloppy("10.180.0.1"),
890 Port: uint16(80),
891 Weight: 1,
892 },
893 {
894 Address: netutils.ParseIPSloppy("10.180.1.1"),
895 Port: uint16(80),
896 Weight: 1,
897 },
898 },
899 {
900 IP: "100.101.102.103",
901 Port: 3001,
902 Protocol: "SCTP",
903 }: {
904 {
905 Address: netutils.ParseIPSloppy("10.180.0.1"),
906 Port: uint16(80),
907 Weight: 1,
908 },
909 },
910 },
911 },
912 expectedIPSets: netlinktest.ExpectedIPSet{
913 kubeNodePortSetSCTP: {
914 {
915 IP: "100.101.102.103",
916 Port: 3001,
917 Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
918 SetType: utilipset.HashIPPort,
919 },
920 },
921 kubeNodePortLocalSetSCTP: {
922 {
923 IP: "100.101.102.103",
924 Port: 3001,
925 Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
926 SetType: utilipset.HashIPPort,
927 },
928 },
929 },
930 expectedIptablesChains: netlinktest.ExpectedIptablesChain{
931 string(kubeNodePortChain): {{
932 JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetSCTP,
933 }, {
934 JumpChain: string(kubeMarkMasqChain), MatchSet: kubeNodePortSetSCTP,
935 }, {
936 JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
937 }},
938 },
939 },
940 }
941
942 for _, test := range tests {
943 t.Run(test.name, func(t *testing.T) {
944 ipt := iptablestest.NewFake()
945 ipvs := ipvstest.NewFake()
946 ipset := ipsettest.NewFake(testIPSetVersion)
947 fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv4Protocol)
948 fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, test.nodePortAddresses, nil)
949
950 makeServiceMap(fp, test.services...)
951 populateEndpointSlices(fp, test.endpoints...)
952
953 fp.syncProxyRules()
954
955 if !reflect.DeepEqual(ipvs, test.expectedIPVS) {
956 t.Logf("actual ipvs state: %+v", ipvs)
957 t.Logf("expected ipvs state: %+v", test.expectedIPVS)
958 t.Errorf("unexpected IPVS state")
959 }
960
961 if test.expectedIPSets != nil {
962 checkIPSet(t, fp, test.expectedIPSets)
963 }
964
965 if test.expectedIptablesChains != nil {
966 checkIptables(t, ipt, test.expectedIptablesChains)
967 }
968 })
969 }
970 }
971
972 func TestNodePortIPv6(t *testing.T) {
973 tests := []struct {
974 name string
975 services []*v1.Service
976 endpoints []*discovery.EndpointSlice
977 nodeIPs []string
978 nodePortAddresses []string
979 expectedIPVS *ipvstest.FakeIPVS
980 expectedIPSets netlinktest.ExpectedIPSet
981 expectedIptablesChains netlinktest.ExpectedIptablesChain
982 }{
983 {
984 name: "1 service with node port, has 2 endpoints",
985 services: []*v1.Service{
986 makeTestService("ns1", "svc1", func(svc *v1.Service) {
987 svc.Spec.Type = "NodePort"
988 svc.Spec.ClusterIP = "2020::1"
989 svc.Spec.Ports = []v1.ServicePort{{
990 Name: "p80",
991 Port: int32(80),
992 Protocol: v1.ProtocolTCP,
993 NodePort: int32(3001),
994 }}
995 }),
996 },
997 endpoints: []*discovery.EndpointSlice{
998 makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
999 eps.AddressType = discovery.AddressTypeIPv4
1000 eps.Endpoints = []discovery.Endpoint{{
1001 Addresses: []string{"10.180.0.1"},
1002 }}
1003 eps.Ports = []discovery.EndpointPort{{
1004 Name: ptr.To("p80"),
1005 Port: ptr.To[int32](80),
1006 Protocol: ptr.To(v1.ProtocolTCP),
1007 }}
1008 }),
1009 makeTestEndpointSlice("ns1", "svc1", 2, func(eps *discovery.EndpointSlice) {
1010 eps.AddressType = discovery.AddressTypeIPv6
1011 eps.Endpoints = []discovery.Endpoint{{
1012 Addresses: []string{"1002:ab8::2:10"},
1013 }}
1014 eps.Ports = []discovery.EndpointPort{{
1015 Name: ptr.To("p80"),
1016 Port: ptr.To[int32](80),
1017 Protocol: ptr.To(v1.ProtocolTCP),
1018 }}
1019 }),
1020 },
1021 nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"},
1022 nodePortAddresses: []string{},
1023 expectedIPVS: &ipvstest.FakeIPVS{
1024 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
1025 {
1026 IP: "2001:db8::1:1",
1027 Port: 3001,
1028 Protocol: "TCP",
1029 }: {
1030 Address: netutils.ParseIPSloppy("2001:db8::1:1"),
1031 Protocol: "TCP",
1032 Port: uint16(3001),
1033 Scheduler: "rr",
1034 },
1035 {
1036 IP: "2020::1",
1037 Port: 80,
1038 Protocol: "TCP",
1039 }: {
1040 Address: netutils.ParseIPSloppy("2020::1"),
1041 Protocol: "TCP",
1042 Port: uint16(80),
1043 Scheduler: "rr",
1044 },
1045 },
1046 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
1047 {
1048 IP: "2001:db8::1:1",
1049 Port: 3001,
1050 Protocol: "TCP",
1051 }: {
1052 {
1053 Address: netutils.ParseIPSloppy("1002:ab8::2:10"),
1054 Port: uint16(80),
1055 Weight: 1,
1056 },
1057 },
1058
1059 {
1060 IP: "2020::1",
1061 Port: 80,
1062 Protocol: "TCP",
1063 }: {
1064 {
1065 Address: netutils.ParseIPSloppy("1002:ab8::2:10"),
1066 Port: uint16(80),
1067 Weight: 1,
1068 },
1069 },
1070 },
1071 },
1072 },
1073
1074 {
1075 name: "1 UDP service with node port, has endpoints (no action on IPv6 Proxier)",
1076 services: []*v1.Service{
1077 makeTestService("ns1", "svc1", func(svc *v1.Service) {
1078 svc.Spec.Type = "NodePort"
1079 svc.Spec.ClusterIP = "10.20.30.41"
1080 svc.Spec.Ports = []v1.ServicePort{{
1081 Name: "p80",
1082 Port: int32(80),
1083 Protocol: v1.ProtocolUDP,
1084 NodePort: int32(3001),
1085 }}
1086 }),
1087 },
1088 endpoints: []*discovery.EndpointSlice{
1089 makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
1090 eps.AddressType = discovery.AddressTypeIPv6
1091 eps.Endpoints = []discovery.Endpoint{{
1092 Addresses: []string{"10.180.0.1"},
1093 }}
1094 eps.Ports = []discovery.EndpointPort{{
1095 Name: ptr.To("p80"),
1096 Port: ptr.To[int32](80),
1097 Protocol: ptr.To(v1.ProtocolUDP),
1098 }}
1099 }),
1100 },
1101 nodeIPs: []string{"100.101.102.103"},
1102 nodePortAddresses: []string{"0.0.0.0/0"},
1103
1104 expectedIPVS: &ipvstest.FakeIPVS{
1105 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{},
1106 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{},
1107 },
1108 expectedIPSets: nil,
1109 expectedIptablesChains: nil,
1110 },
1111
1112 {
1113 name: "service has node port but no endpoints",
1114 services: []*v1.Service{
1115 makeTestService("ns1", "svc1", func(svc *v1.Service) {
1116 svc.Spec.Type = "NodePort"
1117 svc.Spec.ClusterIP = "2020::1"
1118 svc.Spec.Ports = []v1.ServicePort{{
1119 Name: "p80",
1120 Port: int32(80),
1121 Protocol: v1.ProtocolTCP,
1122 NodePort: int32(3001),
1123 }}
1124 }),
1125 },
1126 endpoints: []*discovery.EndpointSlice{},
1127 nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"},
1128 nodePortAddresses: []string{},
1129 expectedIPVS: &ipvstest.FakeIPVS{
1130 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
1131 {
1132 IP: "2001:db8::1:1",
1133 Port: 3001,
1134 Protocol: "TCP",
1135 }: {
1136 Address: netutils.ParseIPSloppy("2001:db8::1:1"),
1137 Protocol: "TCP",
1138 Port: uint16(3001),
1139 Scheduler: "rr",
1140 },
1141 {
1142 IP: "2020::1",
1143 Port: 80,
1144 Protocol: "TCP",
1145 }: {
1146 Address: netutils.ParseIPSloppy("2020::1"),
1147 Protocol: "TCP",
1148 Port: uint16(80),
1149 Scheduler: "rr",
1150 },
1151 },
1152 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
1153 {
1154 IP: "2020::1",
1155 Port: 80,
1156 Protocol: "TCP",
1157 }: {},
1158 {
1159 IP: "2001:db8::1:1",
1160 Port: 3001,
1161 Protocol: "TCP",
1162 }: {},
1163 },
1164 },
1165 },
1166
1167 {
1168 name: "node port service with protocol sctp on a node with multiple nodeIPs",
1169 services: []*v1.Service{
1170 makeTestService("ns1", "svc1", func(svc *v1.Service) {
1171 svc.Spec.Type = "NodePort"
1172 svc.Spec.ClusterIP = "2020::1"
1173 svc.Spec.Ports = []v1.ServicePort{{
1174 Name: "p80",
1175 Port: int32(80),
1176 Protocol: v1.ProtocolSCTP,
1177 NodePort: int32(3001),
1178 }}
1179 }),
1180 },
1181 endpoints: []*discovery.EndpointSlice{
1182 makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
1183 eps.AddressType = discovery.AddressTypeIPv6
1184 eps.Endpoints = []discovery.Endpoint{{
1185 Addresses: []string{"2001::1"},
1186 }}
1187 eps.Ports = []discovery.EndpointPort{{
1188 Name: ptr.To("p80"),
1189 Port: ptr.To[int32](80),
1190 Protocol: ptr.To(v1.ProtocolSCTP),
1191 }}
1192 }),
1193 },
1194 nodeIPs: []string{"2001:db8::1:1", "2001:db8::1:2"},
1195 nodePortAddresses: []string{},
1196 expectedIPVS: &ipvstest.FakeIPVS{
1197 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
1198 {
1199 IP: "2001:db8::1:1",
1200 Port: 3001,
1201 Protocol: "SCTP",
1202 }: {
1203 Address: netutils.ParseIPSloppy("2001:db8::1:1"),
1204 Protocol: "SCTP",
1205 Port: uint16(3001),
1206 Scheduler: "rr",
1207 },
1208 {
1209 IP: "2001:db8::1:2",
1210 Port: 3001,
1211 Protocol: "SCTP",
1212 }: {
1213 Address: netutils.ParseIPSloppy("2001:db8::1:2"),
1214 Protocol: "SCTP",
1215 Port: uint16(3001),
1216 Scheduler: "rr",
1217 },
1218 {
1219 IP: "2020::1",
1220 Port: 80,
1221 Protocol: "SCTP",
1222 }: {
1223 Address: netutils.ParseIPSloppy("2020::1"),
1224 Protocol: "SCTP",
1225 Port: uint16(80),
1226 Scheduler: "rr",
1227 },
1228 },
1229 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
1230 {
1231 IP: "2001:db8::1:1",
1232 Port: 3001,
1233 Protocol: "SCTP",
1234 }: {
1235 {
1236 Address: netutils.ParseIPSloppy("2001::1"),
1237 Port: uint16(80),
1238 Weight: 1,
1239 },
1240 },
1241 {
1242 IP: "2001:db8::1:2",
1243 Port: 3001,
1244 Protocol: "SCTP",
1245 }: {
1246 {
1247 Address: netutils.ParseIPSloppy("2001::1"),
1248 Port: uint16(80),
1249 Weight: 1,
1250 },
1251 },
1252 {
1253 IP: "2020::1",
1254 Port: 80,
1255 Protocol: "SCTP",
1256 }: {
1257 {
1258 Address: netutils.ParseIPSloppy("2001::1"),
1259 Port: uint16(80),
1260 Weight: 1,
1261 },
1262 },
1263 },
1264 },
1265 expectedIPSets: netlinktest.ExpectedIPSet{
1266 kubeNodePortSetSCTP: {
1267 {
1268 IP: "2001:db8::1:1",
1269 Port: 3001,
1270 Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
1271 SetType: utilipset.HashIPPort,
1272 },
1273 {
1274 IP: "2001:db8::1:2",
1275 Port: 3001,
1276 Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
1277 SetType: utilipset.HashIPPort,
1278 },
1279 },
1280 },
1281 },
1282 }
1283
1284 for _, test := range tests {
1285 t.Run(test.name, func(t *testing.T) {
1286 ipt := iptablestest.NewFake()
1287 ipvs := ipvstest.NewFake()
1288 ipset := ipsettest.NewFake(testIPSetVersion)
1289 fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv6Protocol)
1290 fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv6Protocol, test.nodePortAddresses, nil)
1291
1292 makeServiceMap(fp, test.services...)
1293 populateEndpointSlices(fp, test.endpoints...)
1294
1295 fp.syncProxyRules()
1296
1297 if !reflect.DeepEqual(ipvs, test.expectedIPVS) {
1298 t.Logf("actual ipvs state: %+v", ipvs)
1299 t.Logf("expected ipvs state: %+v", test.expectedIPVS)
1300 t.Errorf("unexpected IPVS state")
1301 }
1302
1303 if test.expectedIPSets != nil {
1304 checkIPSet(t, fp, test.expectedIPSets)
1305 }
1306
1307 if test.expectedIptablesChains != nil {
1308 checkIptables(t, ipt, test.expectedIptablesChains)
1309 }
1310 })
1311 }
1312 }
1313
1314 func Test_syncEndpoint_updateWeightsOnRestart(t *testing.T) {
1315 ipt := iptablestest.NewFake()
1316 ipvs := ipvstest.NewFake()
1317 ipset := ipsettest.NewFake(testIPSetVersion)
1318 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
1319
1320 svc1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
1321 svc.Spec.ClusterIP = "10.20.30.41"
1322 svc.Spec.Ports = []v1.ServicePort{{
1323 Name: "p80",
1324 Port: int32(80),
1325 Protocol: v1.ProtocolTCP,
1326 }}
1327 })
1328 epSlice1 := makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
1329 eps.AddressType = discovery.AddressTypeIPv4
1330 eps.Endpoints = []discovery.Endpoint{{
1331 Addresses: []string{"10.180.0.1"},
1332 }}
1333 eps.Ports = []discovery.EndpointPort{{
1334 Name: ptr.To("p80"),
1335 Port: ptr.To[int32](80),
1336 Protocol: ptr.To(v1.ProtocolTCP),
1337 }}
1338 })
1339
1340
1341 makeServiceMap(fp, svc1)
1342 makeEndpointSliceMap(fp, epSlice1)
1343 fp.syncProxyRules()
1344
1345 serv := &utilipvs.VirtualServer{
1346 Address: netutils.ParseIPSloppy("10.20.30.41"),
1347 Port: uint16(80),
1348 Protocol: string(v1.ProtocolTCP),
1349 Scheduler: fp.ipvsScheduler,
1350 }
1351
1352 vs, err := fp.ipvs.GetVirtualServer(serv)
1353 if err != nil {
1354 t.Errorf("failed to get virtual server, err: %v", err)
1355 }
1356
1357 rss, err := fp.ipvs.GetRealServers(vs)
1358 if err != nil {
1359 t.Errorf("failed to get real servers, err: %v", err)
1360 }
1361 for _, rs := range rss {
1362 rs.Weight = 0
1363 if err = fp.ipvs.UpdateRealServer(vs, rs); err != nil {
1364 t.Errorf("failed to update real server: %v, err: %v", rs, err)
1365 }
1366 }
1367
1368
1369 fp.initialSync = true
1370 err = fp.syncEndpoint(proxy.ServicePortName{
1371 NamespacedName: types.NamespacedName{
1372 Name: "svc1",
1373 Namespace: "ns1",
1374 },
1375 Port: "80",
1376 Protocol: v1.ProtocolTCP,
1377 }, true, vs)
1378 if err != nil {
1379 t.Errorf("failed to sync endpoint, err: %v", err)
1380 }
1381
1382 rss, err = fp.ipvs.GetRealServers(vs)
1383 if err != nil {
1384 t.Errorf("failed to get real server, err: %v", err)
1385 }
1386 for _, rs := range rss {
1387 if rs.Weight != 1 {
1388 t.Logf("unexpected realserver weight: %d, expected weight: 1", rs.Weight)
1389 t.Errorf("unexpected realserver state")
1390 }
1391 }
1392 }
1393
1394 func TestIPv4Proxier(t *testing.T) {
1395 tests := []struct {
1396 name string
1397 services []*v1.Service
1398 endpoints []*discovery.EndpointSlice
1399 expectedIPVS *ipvstest.FakeIPVS
1400 }{
1401 {
1402 name: "2 services with Cluster IP, each with endpoints",
1403 services: []*v1.Service{
1404 makeTestService("ns1", "svc1", func(svc *v1.Service) {
1405 svc.Spec.ClusterIP = "10.20.30.41"
1406 svc.Spec.Ports = []v1.ServicePort{{
1407 Name: "p80",
1408 Port: int32(80),
1409 Protocol: v1.ProtocolTCP,
1410 }}
1411 }),
1412 makeTestService("ns2", "svc2", func(svc *v1.Service) {
1413 svc.Spec.ClusterIP = "1002:ab8::2:1"
1414 svc.Spec.Ports = []v1.ServicePort{{
1415 Name: "p8080",
1416 Port: int32(8080),
1417 Protocol: v1.ProtocolTCP,
1418 }}
1419 }),
1420 },
1421 endpoints: []*discovery.EndpointSlice{
1422 makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
1423 eps.AddressType = discovery.AddressTypeIPv4
1424 eps.Endpoints = []discovery.Endpoint{{
1425 Addresses: []string{"10.180.0.1"},
1426 }}
1427 eps.Ports = []discovery.EndpointPort{{
1428 Name: ptr.To("p80"),
1429 Port: ptr.To[int32](80),
1430 Protocol: ptr.To(v1.ProtocolTCP),
1431 }}
1432 }),
1433 makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) {
1434 eps.AddressType = discovery.AddressTypeIPv6
1435 eps.Endpoints = []discovery.Endpoint{{
1436 Addresses: []string{"1009:ab8::5:6"},
1437 }}
1438 eps.Ports = []discovery.EndpointPort{{
1439 Name: ptr.To("p8080"),
1440 Port: ptr.To[int32](8080),
1441 Protocol: ptr.To(v1.ProtocolTCP),
1442 }}
1443 }),
1444 },
1445 expectedIPVS: &ipvstest.FakeIPVS{
1446 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
1447 {
1448 IP: "10.20.30.41",
1449 Port: 80,
1450 Protocol: "TCP",
1451 }: {
1452 Address: netutils.ParseIPSloppy("10.20.30.41"),
1453 Protocol: "TCP",
1454 Port: uint16(80),
1455 Scheduler: "rr",
1456 },
1457 },
1458 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
1459 {
1460 IP: "10.20.30.41",
1461 Port: 80,
1462 Protocol: "TCP",
1463 }: {
1464 {
1465 Address: netutils.ParseIPSloppy("10.180.0.1"),
1466 Port: uint16(80),
1467 Weight: 1,
1468 },
1469 },
1470 },
1471 },
1472 },
1473 {
1474 name: "cluster IP service with no endpoints",
1475 services: []*v1.Service{
1476 makeTestService("ns1", "svc1", func(svc *v1.Service) {
1477 svc.Spec.ClusterIP = "10.20.30.41"
1478 svc.Spec.Ports = []v1.ServicePort{{
1479 Name: "p80",
1480 Port: int32(80),
1481 Protocol: v1.ProtocolTCP,
1482 }}
1483 }),
1484 },
1485 endpoints: []*discovery.EndpointSlice{},
1486 expectedIPVS: &ipvstest.FakeIPVS{
1487 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
1488 {
1489 IP: "10.20.30.41",
1490 Port: 80,
1491 Protocol: "TCP",
1492 }: {
1493 Address: netutils.ParseIPSloppy("10.20.30.41"),
1494 Protocol: "TCP",
1495 Port: uint16(80),
1496 Scheduler: "rr",
1497 },
1498 },
1499 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
1500 {
1501 IP: "10.20.30.41",
1502 Port: 80,
1503 Protocol: "TCP",
1504 }: {},
1505 },
1506 },
1507 },
1508 }
1509
1510 for _, test := range tests {
1511 t.Run(test.name, func(t *testing.T) {
1512 ipt := iptablestest.NewFake()
1513 ipvs := ipvstest.NewFake()
1514 ipset := ipsettest.NewFake(testIPSetVersion)
1515 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
1516
1517 makeServiceMap(fp, test.services...)
1518 populateEndpointSlices(fp, test.endpoints...)
1519
1520 fp.syncProxyRules()
1521
1522 if !reflect.DeepEqual(ipvs, test.expectedIPVS) {
1523 t.Logf("actual ipvs state: %v", ipvs)
1524 t.Logf("expected ipvs state: %v", test.expectedIPVS)
1525 t.Errorf("unexpected IPVS state")
1526 }
1527 })
1528 }
1529 }
1530
1531 func TestIPv6Proxier(t *testing.T) {
1532 tests := []struct {
1533 name string
1534 services []*v1.Service
1535 endpoints []*discovery.EndpointSlice
1536 expectedIPVS *ipvstest.FakeIPVS
1537 }{
1538 {
1539 name: "2 services with Cluster IP, each with endpoints",
1540 services: []*v1.Service{
1541 makeTestService("ns1", "svc1", func(svc *v1.Service) {
1542 svc.Spec.ClusterIP = "10.20.30.41"
1543 svc.Spec.Ports = []v1.ServicePort{{
1544 Name: "p80",
1545 Port: int32(80),
1546 Protocol: v1.ProtocolTCP,
1547 }}
1548 }),
1549 makeTestService("ns2", "svc2", func(svc *v1.Service) {
1550 svc.Spec.ClusterIP = "1002:ab8::2:1"
1551 svc.Spec.Ports = []v1.ServicePort{{
1552 Name: "p8080",
1553 Port: int32(8080),
1554 Protocol: v1.ProtocolTCP,
1555 }}
1556 }),
1557 },
1558 endpoints: []*discovery.EndpointSlice{
1559 makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
1560 eps.AddressType = discovery.AddressTypeIPv4
1561 eps.Endpoints = []discovery.Endpoint{{
1562 Addresses: []string{"10.180.0.1"},
1563 }}
1564 eps.Ports = []discovery.EndpointPort{{
1565 Name: ptr.To("p80"),
1566 Port: ptr.To[int32](80),
1567 Protocol: ptr.To(v1.ProtocolTCP),
1568 }}
1569 }),
1570 makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) {
1571 eps.AddressType = discovery.AddressTypeIPv6
1572 eps.Endpoints = []discovery.Endpoint{{
1573 Addresses: []string{"1009:ab8::5:6"},
1574 }}
1575 eps.Ports = []discovery.EndpointPort{{
1576 Name: ptr.To("p8080"),
1577 Port: ptr.To[int32](8080),
1578 Protocol: ptr.To(v1.ProtocolTCP),
1579 }}
1580 }),
1581 },
1582 expectedIPVS: &ipvstest.FakeIPVS{
1583 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
1584 {
1585 IP: "1002:ab8::2:1",
1586 Port: 8080,
1587 Protocol: "TCP",
1588 }: {
1589 Address: netutils.ParseIPSloppy("1002:ab8::2:1"),
1590 Protocol: "TCP",
1591 Port: uint16(8080),
1592 Scheduler: "rr",
1593 },
1594 },
1595 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
1596 {
1597 IP: "1002:ab8::2:1",
1598 Port: 8080,
1599 Protocol: "TCP",
1600 }: {
1601 {
1602 Address: netutils.ParseIPSloppy("1009:ab8::5:6"),
1603 Port: uint16(8080),
1604 Weight: 1,
1605 },
1606 },
1607 },
1608 },
1609 },
1610 {
1611 name: "cluster IP service with no endpoints",
1612 services: []*v1.Service{
1613 makeTestService("ns1", "svc1", func(svc *v1.Service) {
1614 svc.Spec.ClusterIP = "2001::1"
1615 svc.Spec.Ports = []v1.ServicePort{{
1616 Name: "p80",
1617 Port: int32(80),
1618 Protocol: v1.ProtocolTCP,
1619 }}
1620 }),
1621 },
1622 endpoints: []*discovery.EndpointSlice{},
1623 expectedIPVS: &ipvstest.FakeIPVS{
1624 Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
1625 {
1626 IP: "2001::1",
1627 Port: 80,
1628 Protocol: "TCP",
1629 }: {
1630 Address: netutils.ParseIPSloppy("2001::1"),
1631 Protocol: "TCP",
1632 Port: uint16(80),
1633 Scheduler: "rr",
1634 },
1635 },
1636 Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
1637 {
1638 IP: "2001::1",
1639 Port: 80,
1640 Protocol: "TCP",
1641 }: {},
1642 },
1643 },
1644 },
1645 }
1646
1647 for _, test := range tests {
1648 t.Run(test.name, func(t *testing.T) {
1649 ipt := iptablestest.NewFake()
1650 ipvs := ipvstest.NewFake()
1651 ipset := ipsettest.NewFake(testIPSetVersion)
1652 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv6Protocol)
1653
1654 makeServiceMap(fp, test.services...)
1655 populateEndpointSlices(fp, test.endpoints...)
1656
1657 fp.syncProxyRules()
1658
1659 if !reflect.DeepEqual(ipvs, test.expectedIPVS) {
1660 t.Logf("actual ipvs state: %v", ipvs)
1661 t.Logf("expected ipvs state: %v", test.expectedIPVS)
1662 t.Errorf("unexpected IPVS state")
1663 }
1664 })
1665 }
1666 }
1667
1668 func TestMasqueradeRule(t *testing.T) {
1669 for _, testcase := range []bool{false, true} {
1670 ipt := iptablestest.NewFake().SetHasRandomFully(testcase)
1671 ipvs := ipvstest.NewFake()
1672 ipset := ipsettest.NewFake(testIPSetVersion)
1673 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
1674 makeServiceMap(fp)
1675 fp.syncProxyRules()
1676
1677 buf := bytes.NewBuffer(nil)
1678 _ = ipt.SaveInto(utiliptables.TableNAT, buf)
1679 natRules := strings.Split(buf.String(), "\n")
1680 var hasMasqueradeJump, hasMasqRandomFully bool
1681 for _, line := range natRules {
1682 rule, _ := iptablestest.ParseRule(line, false)
1683 if rule != nil && rule.Chain == kubePostroutingChain && rule.Jump != nil && rule.Jump.Value == "MASQUERADE" {
1684 hasMasqueradeJump = true
1685 if rule.RandomFully != nil {
1686 hasMasqRandomFully = true
1687 }
1688 break
1689 }
1690 }
1691
1692 if !hasMasqueradeJump {
1693 t.Errorf("Failed to find -j MASQUERADE in %s chain", kubePostroutingChain)
1694 }
1695 if hasMasqRandomFully != testcase {
1696 probs := map[bool]string{false: "found", true: "did not find"}
1697 t.Errorf("%s --random-fully in -j MASQUERADE rule in %s chain for HasRandomFully()=%v", probs[testcase], kubePostroutingChain, testcase)
1698 }
1699 }
1700 }
1701
1702 func TestExternalIPsNoEndpoint(t *testing.T) {
1703 ipt := iptablestest.NewFake()
1704 ipvs := ipvstest.NewFake()
1705 ipset := ipsettest.NewFake(testIPSetVersion)
1706 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
1707 svcIP := "10.20.30.41"
1708 svcPort := 80
1709 svcExternalIPs := "50.60.70.81"
1710 svcPortName := proxy.ServicePortName{
1711 NamespacedName: makeNSN("ns1", "svc1"),
1712 Port: "p80",
1713 }
1714
1715 makeServiceMap(fp,
1716 makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
1717 svc.Spec.Type = "ClusterIP"
1718 svc.Spec.ClusterIP = svcIP
1719 svc.Spec.ExternalIPs = []string{svcExternalIPs}
1720 svc.Spec.Ports = []v1.ServicePort{{
1721 Name: svcPortName.Port,
1722 Port: int32(svcPort),
1723 Protocol: v1.ProtocolTCP,
1724 TargetPort: intstr.FromInt32(int32(svcPort)),
1725 }}
1726 }),
1727 )
1728 fp.syncProxyRules()
1729
1730
1731 services, err := ipvs.GetVirtualServers()
1732 if err != nil {
1733 t.Errorf("Failed to get ipvs services, err: %v", err)
1734 }
1735 if len(services) != 2 {
1736 t.Errorf("Expect 2 ipvs services, got %d", len(services))
1737 }
1738 found := false
1739 for _, svc := range services {
1740 if svc.Address.String() == svcExternalIPs && svc.Port == uint16(svcPort) && svc.Protocol == string(v1.ProtocolTCP) {
1741 found = true
1742 destinations, _ := ipvs.GetRealServers(svc)
1743 if len(destinations) != 0 {
1744 t.Errorf("Unexpected %d destinations, expect 0 destinations", len(destinations))
1745 }
1746 break
1747 }
1748 }
1749 if !found {
1750 t.Errorf("Expect external ip type service, got none")
1751 }
1752 }
1753
1754 func TestExternalIPs(t *testing.T) {
1755 ipt := iptablestest.NewFake()
1756 ipvs := ipvstest.NewFake()
1757 ipset := ipsettest.NewFake(testIPSetVersion)
1758 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
1759 svcIP := "10.20.30.41"
1760 svcPort := 80
1761 svcExternalIPs := sets.New[string]("50.60.70.81", "2012::51", "127.0.0.1")
1762 svcPortName := proxy.ServicePortName{
1763 NamespacedName: makeNSN("ns1", "svc1"),
1764 Port: "p80",
1765 }
1766
1767 makeServiceMap(fp,
1768 makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
1769 svc.Spec.Type = "ClusterIP"
1770 svc.Spec.ClusterIP = svcIP
1771 svc.Spec.ExternalIPs = svcExternalIPs.UnsortedList()
1772 svc.Spec.Ports = []v1.ServicePort{{
1773 Name: svcPortName.Port,
1774 Port: int32(svcPort),
1775 Protocol: v1.ProtocolTCP,
1776 TargetPort: intstr.FromInt32(int32(svcPort)),
1777 }}
1778 }),
1779 )
1780
1781 epIP := "10.180.0.1"
1782 populateEndpointSlices(fp,
1783 makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
1784 eps.AddressType = discovery.AddressTypeIPv4
1785 eps.Endpoints = []discovery.Endpoint{{
1786 Addresses: []string{epIP},
1787 }}
1788 eps.Ports = []discovery.EndpointPort{{
1789 Name: ptr.To(svcPortName.Port),
1790 Port: ptr.To(int32(svcPort)),
1791 Protocol: ptr.To(v1.ProtocolUDP),
1792 }}
1793 }),
1794 )
1795
1796 fp.syncProxyRules()
1797
1798
1799 services, err := ipvs.GetVirtualServers()
1800 if err != nil {
1801 t.Errorf("Failed to get ipvs services, err: %v", err)
1802 }
1803 if len(services) != 3 {
1804 t.Errorf("Expect 3 ipvs services, got %d", len(services))
1805 }
1806 found := false
1807 for _, svc := range services {
1808 if svcExternalIPs.Has(svc.Address.String()) && svc.Port == uint16(svcPort) && svc.Protocol == string(v1.ProtocolTCP) {
1809 found = true
1810 destinations, _ := ipvs.GetRealServers(svc)
1811 for _, dest := range destinations {
1812 if dest.Address.String() != epIP || dest.Port != uint16(svcPort) {
1813 t.Errorf("service Endpoint mismatch ipvs service destination")
1814 }
1815 }
1816 break
1817 }
1818 }
1819 if !found {
1820 t.Errorf("Expect external ip type service, got none")
1821 }
1822 }
1823
1824 func TestOnlyLocalExternalIPs(t *testing.T) {
1825 ipt := iptablestest.NewFake()
1826 ipvs := ipvstest.NewFake()
1827 ipset := ipsettest.NewFake(testIPSetVersion)
1828 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
1829 svcIP := "10.20.30.41"
1830 svcPort := 80
1831 svcExternalIPs := sets.New[string]("50.60.70.81", "2012::51", "127.0.0.1")
1832 svcPortName := proxy.ServicePortName{
1833 NamespacedName: makeNSN("ns1", "svc1"),
1834 Port: "p80",
1835 }
1836
1837 makeServiceMap(fp,
1838 makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
1839 svc.Spec.Type = "NodePort"
1840 svc.Spec.ClusterIP = svcIP
1841 svc.Spec.ExternalIPs = svcExternalIPs.UnsortedList()
1842 svc.Spec.Ports = []v1.ServicePort{{
1843 Name: svcPortName.Port,
1844 Port: int32(svcPort),
1845 Protocol: v1.ProtocolTCP,
1846 TargetPort: intstr.FromInt32(int32(svcPort)),
1847 }}
1848 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
1849 }),
1850 )
1851 epIP := "10.180.0.1"
1852 epIP1 := "10.180.1.1"
1853 thisHostname := testHostname
1854 otherHostname := "other-hostname"
1855 populateEndpointSlices(fp,
1856 makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
1857 eps.AddressType = discovery.AddressTypeIPv4
1858 eps.Endpoints = []discovery.Endpoint{{
1859 Addresses: []string{epIP},
1860 NodeName: ptr.To(thisHostname),
1861 },
1862 {
1863 Addresses: []string{epIP1},
1864 NodeName: ptr.To(otherHostname),
1865 }}
1866 eps.Ports = []discovery.EndpointPort{{
1867 Name: ptr.To(svcPortName.Port),
1868 Port: ptr.To(int32(svcPort)),
1869 Protocol: ptr.To(v1.ProtocolTCP),
1870 }}
1871 }),
1872 )
1873
1874 fp.syncProxyRules()
1875
1876
1877 services, err := ipvs.GetVirtualServers()
1878 if err != nil {
1879 t.Errorf("Failed to get ipvs services, err: %v", err)
1880 }
1881 if len(services) != 3 {
1882 t.Errorf("Expect 3 ipvs services, got %d", len(services))
1883 }
1884 found := false
1885 for _, svc := range services {
1886 if svcExternalIPs.Has(svc.Address.String()) && svc.Port == uint16(svcPort) && svc.Protocol == string(v1.ProtocolTCP) {
1887 found = true
1888 destinations, _ := ipvs.GetRealServers(svc)
1889 if len(destinations) != 1 {
1890 t.Errorf("Expect only 1 local endpoint. but got %v", len(destinations))
1891 }
1892 for _, dest := range destinations {
1893 if dest.Address.String() != epIP || dest.Port != uint16(svcPort) {
1894 t.Errorf("service Endpoint mismatch ipvs service destination")
1895 }
1896 }
1897 break
1898 }
1899 }
1900 if !found {
1901 t.Errorf("Expect external ip type service, got none")
1902 }
1903 }
1904
1905 func TestLoadBalancer(t *testing.T) {
1906 ipt, fp := buildFakeProxier()
1907 svcIP := "10.20.30.41"
1908 svcPort := 80
1909 svcNodePort := 3001
1910 svcLBIP := "1.2.3.4"
1911 svcPortName := proxy.ServicePortName{
1912 NamespacedName: makeNSN("ns1", "svc1"),
1913 Port: "p80",
1914 }
1915
1916 makeServiceMap(fp,
1917 makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
1918 svc.Spec.Type = "LoadBalancer"
1919 svc.Spec.ClusterIP = svcIP
1920 svc.Spec.Ports = []v1.ServicePort{{
1921 Name: svcPortName.Port,
1922 Port: int32(svcPort),
1923 Protocol: v1.ProtocolTCP,
1924 NodePort: int32(svcNodePort),
1925 }}
1926 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
1927 IP: svcLBIP,
1928 }}
1929 }),
1930 )
1931
1932 epIP := "10.180.0.1"
1933 populateEndpointSlices(fp,
1934 makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
1935 eps.AddressType = discovery.AddressTypeIPv4
1936 eps.Endpoints = []discovery.Endpoint{{
1937 Addresses: []string{epIP},
1938 }}
1939 eps.Ports = []discovery.EndpointPort{{
1940 Name: ptr.To(svcPortName.Port),
1941 Port: ptr.To(int32(svcPort)),
1942 Protocol: ptr.To(v1.ProtocolUDP),
1943 }}
1944 }),
1945 )
1946
1947 fp.syncProxyRules()
1948
1949
1950 epVS := &netlinktest.ExpectedVirtualServer{
1951 VSNum: 2, IP: svcLBIP, Port: uint16(svcNodePort), Protocol: string(v1.ProtocolTCP),
1952 RS: []netlinktest.ExpectedRealServer{{
1953 IP: epIP, Port: uint16(svcPort),
1954 }}}
1955 checkIPVS(t, fp, epVS)
1956
1957
1958 epIPSet := netlinktest.ExpectedIPSet{
1959 kubeLoadBalancerSet: {{
1960 IP: svcLBIP,
1961 Port: svcPort,
1962 Protocol: strings.ToLower(string(v1.ProtocolTCP)),
1963 SetType: utilipset.HashIPPort,
1964 }},
1965 }
1966 checkIPSet(t, fp, epIPSet)
1967
1968
1969 epIpt := netlinktest.ExpectedIptablesChain{
1970 string(kubeServicesChain): {{
1971 JumpChain: "RETURN", SourceAddress: "127.0.0.0/8",
1972 }, {
1973 JumpChain: string(kubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
1974 }, {
1975 JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet,
1976 }, {
1977 JumpChain: string(kubeNodePortChain), MatchSet: "",
1978 }, {
1979 JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
1980 }, {
1981 JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
1982 }},
1983 string(kubeLoadBalancerSet): {{
1984 JumpChain: string(kubeMarkMasqChain), MatchSet: "",
1985 }},
1986 }
1987 checkIptables(t, ipt, epIpt)
1988 }
1989
1990 func TestOnlyLocalNodePorts(t *testing.T) {
1991 nodeIP := netutils.ParseIPSloppy("100.101.102.103")
1992 ipt, fp := buildFakeProxier()
1993
1994 svcIP := "10.20.30.41"
1995 svcPort := 80
1996 svcNodePort := 3001
1997 svcPortName := proxy.ServicePortName{
1998 NamespacedName: makeNSN("ns1", "svc1"),
1999 Port: "p80",
2000 }
2001
2002 makeServiceMap(fp,
2003 makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
2004 svc.Spec.Type = "NodePort"
2005 svc.Spec.ClusterIP = svcIP
2006 svc.Spec.Ports = []v1.ServicePort{{
2007 Name: svcPortName.Port,
2008 Port: int32(svcPort),
2009 Protocol: v1.ProtocolTCP,
2010 NodePort: int32(svcNodePort),
2011 }}
2012 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
2013 }),
2014 )
2015
2016 epIP := "10.180.0.1"
2017 epIP1 := "10.180.1.1"
2018
2019 populateEndpointSlices(fp,
2020 makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
2021 eps.AddressType = discovery.AddressTypeIPv4
2022 eps.Endpoints = []discovery.Endpoint{{
2023 Addresses: []string{epIP},
2024 NodeName: ptr.To(testHostname),
2025 }, {
2026 Addresses: []string{epIP1},
2027 NodeName: ptr.To("other-hostname"),
2028 }}
2029 eps.Ports = []discovery.EndpointPort{{
2030 Name: ptr.To(svcPortName.Port),
2031 Port: ptr.To(int32(svcPort)),
2032 Protocol: ptr.To(v1.ProtocolTCP),
2033 }}
2034 }),
2035 )
2036
2037 itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0}
2038 addrs := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("100.101.102.103"), Mask: net.CIDRMask(24, 32)}}
2039 itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
2040 addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
2041 fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
2042 fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
2043 fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"100.101.102.0/24"}, nil)
2044
2045 fp.syncProxyRules()
2046
2047
2048 epVS := &netlinktest.ExpectedVirtualServer{
2049 VSNum: 2, IP: nodeIP.String(), Port: uint16(svcNodePort), Protocol: string(v1.ProtocolTCP),
2050 RS: []netlinktest.ExpectedRealServer{{
2051 IP: epIP, Port: uint16(svcPort),
2052 }}}
2053 checkIPVS(t, fp, epVS)
2054
2055
2056 epEntry := &utilipset.Entry{
2057 Port: svcNodePort,
2058 Protocol: strings.ToLower(string(v1.ProtocolTCP)),
2059 SetType: utilipset.BitmapPort,
2060 }
2061 epIPSet := netlinktest.ExpectedIPSet{
2062 kubeNodePortSetTCP: {epEntry},
2063 kubeNodePortLocalSetTCP: {epEntry},
2064 }
2065 checkIPSet(t, fp, epIPSet)
2066
2067
2068 epIpt := netlinktest.ExpectedIptablesChain{
2069 string(kubeServicesChain): {{
2070 JumpChain: "RETURN", SourceAddress: "127.0.0.0/8",
2071 }, {
2072 JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet,
2073 }, {
2074 JumpChain: string(kubeNodePortChain), MatchSet: "",
2075 }, {
2076 JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
2077 }},
2078 string(kubeNodePortChain): {{
2079 JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP,
2080 }, {
2081 JumpChain: string(kubeMarkMasqChain), MatchSet: kubeNodePortSetTCP,
2082 }, {
2083 JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
2084 }},
2085 }
2086 checkIptables(t, ipt, epIpt)
2087 }
2088
2089 func TestHealthCheckNodePort(t *testing.T) {
2090 ipt, fp := buildFakeProxier()
2091
2092 svcIP := "10.20.30.41"
2093 svcPort := 80
2094 svcNodePort := 3000
2095 svcPortName := proxy.ServicePortName{
2096 NamespacedName: makeNSN("ns1", "svc1"),
2097 Port: "p80",
2098 }
2099
2100 sampleSvc := makeTestService(svcPortName.Namespace, "", func(svc *v1.Service) {
2101 svc.Spec.Type = "LoadBalancer"
2102 svc.Spec.ClusterIP = svcIP
2103 svc.Spec.Ports = []v1.ServicePort{{
2104 Name: svcPortName.Port,
2105 Port: int32(svcPort),
2106 Protocol: v1.ProtocolTCP,
2107 NodePort: int32(svcNodePort),
2108 }}
2109 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
2110 })
2111
2112 svc1, svc2, invalidSvc3 := *sampleSvc, *sampleSvc, *sampleSvc
2113 svc1.Name, svc1.Spec.HealthCheckNodePort = "valid-svc1", 30000
2114 svc2.Name, svc2.Spec.HealthCheckNodePort = "valid-svc2", 30001
2115
2116 invalidSvc3.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
2117 invalidSvc3.Name, invalidSvc3.Spec.HealthCheckNodePort = "invalid-svc3", 30002
2118
2119 makeServiceMap(fp,
2120 &svc1,
2121 &svc2,
2122 &invalidSvc3,
2123 )
2124
2125 itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0}
2126 addrs := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("100.101.102.103"), Mask: net.CIDRMask(24, 32)}}
2127 itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
2128 addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
2129 fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
2130 fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
2131 fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"100.101.102.0/24"}, nil)
2132
2133 fp.syncProxyRules()
2134
2135
2136 makeTCPEntry := func(port int) *utilipset.Entry {
2137 return &utilipset.Entry{
2138 Port: port,
2139 Protocol: strings.ToLower(string(v1.ProtocolTCP)),
2140 SetType: utilipset.BitmapPort,
2141 }
2142 }
2143 epIPSet := netlinktest.ExpectedIPSet{
2144
2145 kubeHealthCheckNodePortSet: {makeTCPEntry(30000), makeTCPEntry(30001)},
2146 }
2147 checkIPSet(t, fp, epIPSet)
2148
2149
2150 epIpt := netlinktest.ExpectedIptablesChain{
2151 string(kubeNodePortChain): {{
2152 JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP,
2153 }, {
2154 JumpChain: string(kubeMarkMasqChain), MatchSet: kubeNodePortSetTCP,
2155 }, {
2156 JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
2157 }},
2158 }
2159 checkIptables(t, ipt, epIpt)
2160 }
2161
2162 func TestLoadBalancerSourceRanges(t *testing.T) {
2163 ipt, fp := buildFakeProxier()
2164
2165 svcIP := "10.20.30.41"
2166 svcPort := 80
2167 svcLBIP := "1.2.3.4"
2168 svcLBSource := "10.0.0.0/8"
2169 svcPortName := proxy.ServicePortName{
2170 NamespacedName: makeNSN("ns1", "svc1"),
2171 Port: "p80",
2172 }
2173 epIP := "10.180.0.1"
2174
2175 makeServiceMap(fp,
2176 makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
2177 svc.Spec.Type = "LoadBalancer"
2178 svc.Spec.ClusterIP = svcIP
2179 svc.Spec.Ports = []v1.ServicePort{{
2180 Name: svcPortName.Port,
2181 Port: int32(svcPort),
2182 Protocol: v1.ProtocolTCP,
2183 }}
2184 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
2185 IP: svcLBIP,
2186 }}
2187 svc.Spec.LoadBalancerSourceRanges = []string{
2188 svcLBSource,
2189 }
2190 }),
2191 )
2192 populateEndpointSlices(fp,
2193 makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
2194 eps.AddressType = discovery.AddressTypeIPv4
2195 eps.Endpoints = []discovery.Endpoint{{
2196 Addresses: []string{epIP},
2197 }}
2198 eps.Ports = []discovery.EndpointPort{{
2199 Name: ptr.To(svcPortName.Port),
2200 Port: ptr.To(int32(svcPort)),
2201 Protocol: ptr.To(v1.ProtocolTCP),
2202 }}
2203 }),
2204 )
2205
2206 fp.syncProxyRules()
2207
2208
2209 epVS := &netlinktest.ExpectedVirtualServer{
2210 VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(v1.ProtocolTCP),
2211 RS: []netlinktest.ExpectedRealServer{{
2212 IP: epIP, Port: uint16(svcPort),
2213 }}}
2214 checkIPVS(t, fp, epVS)
2215
2216
2217 epIPSet := netlinktest.ExpectedIPSet{
2218 kubeLoadBalancerSet: {{
2219 IP: svcLBIP,
2220 Port: svcPort,
2221 Protocol: strings.ToLower(string(v1.ProtocolTCP)),
2222 SetType: utilipset.HashIPPort,
2223 }},
2224 kubeLoadBalancerFWSet: {{
2225 IP: svcLBIP,
2226 Port: svcPort,
2227 Protocol: strings.ToLower(string(v1.ProtocolTCP)),
2228 SetType: utilipset.HashIPPort,
2229 }},
2230 kubeLoadBalancerSourceCIDRSet: {{
2231 IP: svcLBIP,
2232 Port: svcPort,
2233 Protocol: strings.ToLower(string(v1.ProtocolTCP)),
2234 Net: svcLBSource,
2235 SetType: utilipset.HashIPPortNet,
2236 }},
2237 }
2238 checkIPSet(t, fp, epIPSet)
2239
2240
2241 epIpt := netlinktest.ExpectedIptablesChain{
2242 string(kubeServicesChain): {{
2243 JumpChain: "RETURN", SourceAddress: "127.0.0.0/8",
2244 }, {
2245 JumpChain: string(kubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
2246 }, {
2247 JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet,
2248 }, {
2249 JumpChain: string(kubeNodePortChain), MatchSet: "",
2250 }, {
2251 JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
2252 }, {
2253 JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
2254 }},
2255 string(kubeProxyFirewallChain): {{
2256 JumpChain: string(kubeSourceRangesFirewallChain), MatchSet: kubeLoadBalancerFWSet,
2257 }},
2258 string(kubeSourceRangesFirewallChain): {{
2259 JumpChain: "RETURN", MatchSet: kubeLoadBalancerSourceCIDRSet,
2260 }, {
2261 JumpChain: "DROP", MatchSet: "",
2262 }},
2263 }
2264 checkIptables(t, ipt, epIpt)
2265 }
2266
2267 func TestAcceptIPVSTraffic(t *testing.T) {
2268 ipt, fp := buildFakeProxier()
2269
2270 ingressIP := "1.2.3.4"
2271 externalIP := []string{"5.6.7.8"}
2272 svcInfos := []struct {
2273 svcType v1.ServiceType
2274 svcIP string
2275 svcName string
2276 epIP string
2277 }{
2278 {v1.ServiceTypeClusterIP, "10.20.30.40", "svc1", "10.180.0.1"},
2279 {v1.ServiceTypeLoadBalancer, "10.20.30.41", "svc2", "10.180.0.2"},
2280 {v1.ServiceTypeNodePort, "10.20.30.42", "svc3", "10.180.0.3"},
2281 }
2282
2283 for _, svcInfo := range svcInfos {
2284 makeServiceMap(fp,
2285 makeTestService("ns1", svcInfo.svcName, func(svc *v1.Service) {
2286 svc.Spec.Type = svcInfo.svcType
2287 svc.Spec.ClusterIP = svcInfo.svcIP
2288 svc.Spec.Ports = []v1.ServicePort{{
2289 Name: "p80",
2290 Port: 80,
2291 Protocol: v1.ProtocolTCP,
2292 NodePort: 80,
2293 }}
2294 if svcInfo.svcType == v1.ServiceTypeLoadBalancer {
2295 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
2296 IP: ingressIP,
2297 }}
2298 }
2299 if svcInfo.svcType == v1.ServiceTypeClusterIP {
2300 svc.Spec.ExternalIPs = externalIP
2301 }
2302 }),
2303 )
2304
2305 populateEndpointSlices(fp,
2306 makeTestEndpointSlice("ns1", "p80", 1, func(eps *discovery.EndpointSlice) {
2307 eps.Endpoints = []discovery.Endpoint{{
2308 Addresses: []string{svcInfo.epIP},
2309 }}
2310 eps.Ports = []discovery.EndpointPort{{
2311 Name: ptr.To("p80"),
2312 Port: ptr.To[int32](80),
2313 Protocol: ptr.To(v1.ProtocolUDP),
2314 }}
2315 }),
2316 )
2317 }
2318 fp.syncProxyRules()
2319
2320
2321 epIpt := netlinktest.ExpectedIptablesChain{
2322 string(kubeServicesChain): {
2323 {JumpChain: "RETURN", SourceAddress: "127.0.0.0/8"},
2324 {JumpChain: string(kubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet},
2325 {JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet},
2326 {JumpChain: string(kubeMarkMasqChain), MatchSet: kubeExternalIPSet},
2327 {JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet},
2328 {JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet},
2329 {JumpChain: string(kubeNodePortChain), MatchSet: ""},
2330 {JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet},
2331 {JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet},
2332 },
2333 }
2334 checkIptables(t, ipt, epIpt)
2335 }
2336
2337 func TestOnlyLocalLoadBalancing(t *testing.T) {
2338 ipt, fp := buildFakeProxier()
2339
2340 svcIP := "10.20.30.41"
2341 svcPort := 80
2342 svcNodePort := 3001
2343 svcLBIP := "1.2.3.4"
2344 svcPortName := proxy.ServicePortName{
2345 NamespacedName: makeNSN("ns1", "svc1"),
2346 Port: "p80",
2347 }
2348
2349 makeServiceMap(fp,
2350 makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
2351 svc.Spec.Type = "LoadBalancer"
2352 svc.Spec.ClusterIP = svcIP
2353 svc.Spec.Ports = []v1.ServicePort{{
2354 Name: svcPortName.Port,
2355 Port: int32(svcPort),
2356 Protocol: v1.ProtocolTCP,
2357 NodePort: int32(svcNodePort),
2358 }}
2359 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
2360 IP: svcLBIP,
2361 }}
2362 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
2363 }),
2364 )
2365
2366 epIP := "10.180.0.1"
2367 epIP1 := "10.180.1.1"
2368
2369 populateEndpointSlices(fp,
2370 makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
2371 eps.AddressType = discovery.AddressTypeIPv4
2372 eps.Endpoints = []discovery.Endpoint{
2373 {
2374 Addresses: []string{epIP},
2375 NodeName: ptr.To(testHostname),
2376 },
2377 {
2378 Addresses: []string{epIP1},
2379 NodeName: ptr.To("other-hostname"),
2380 }}
2381 eps.Ports = []discovery.EndpointPort{{
2382 Name: ptr.To(svcPortName.Port),
2383 Port: ptr.To(int32(svcPort)),
2384 Protocol: ptr.To(v1.ProtocolTCP),
2385 }}
2386 }),
2387 )
2388
2389 fp.syncProxyRules()
2390
2391
2392 epVS := &netlinktest.ExpectedVirtualServer{
2393 VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(v1.ProtocolTCP),
2394 RS: []netlinktest.ExpectedRealServer{{
2395 IP: epIP, Port: uint16(svcPort),
2396 }}}
2397 checkIPVS(t, fp, epVS)
2398
2399
2400 epIPSet := netlinktest.ExpectedIPSet{
2401 kubeLoadBalancerSet: {{
2402 IP: svcLBIP,
2403 Port: svcPort,
2404 Protocol: strings.ToLower(string(v1.ProtocolTCP)),
2405 SetType: utilipset.HashIPPort,
2406 }},
2407 kubeLoadBalancerLocalSet: {{
2408 IP: svcLBIP,
2409 Port: svcPort,
2410 Protocol: strings.ToLower(string(v1.ProtocolTCP)),
2411 SetType: utilipset.HashIPPort,
2412 }},
2413 }
2414 checkIPSet(t, fp, epIPSet)
2415
2416
2417 epIpt := netlinktest.ExpectedIptablesChain{
2418 string(kubeServicesChain): {{
2419 JumpChain: "RETURN", SourceAddress: "127.0.0.0/8",
2420 }, {
2421 JumpChain: string(kubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
2422 }, {
2423 JumpChain: string(kubeMarkMasqChain), MatchSet: kubeClusterIPSet,
2424 }, {
2425 JumpChain: string(kubeNodePortChain), MatchSet: "",
2426 }, {
2427 JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
2428 }, {
2429 JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
2430 }},
2431 string(kubeLoadBalancerChain): {{
2432 JumpChain: "RETURN", MatchSet: kubeLoadBalancerLocalSet,
2433 }, {
2434 JumpChain: string(kubeMarkMasqChain), MatchSet: "",
2435 }},
2436 }
2437 checkIptables(t, ipt, epIpt)
2438 }
2439
2440 func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port, nodeport int32, targetPort int) []v1.ServicePort {
2441 svcPort := v1.ServicePort{
2442 Name: name,
2443 Protocol: protocol,
2444 Port: port,
2445 NodePort: nodeport,
2446 TargetPort: intstr.FromInt32(int32(targetPort)),
2447 }
2448 return append(array, svcPort)
2449 }
2450
2451 func TestBuildServiceMapAddRemove(t *testing.T) {
2452 ipt := iptablestest.NewFake()
2453 ipvs := ipvstest.NewFake()
2454 ipset := ipsettest.NewFake(testIPSetVersion)
2455 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
2456
2457 services := []*v1.Service{
2458 makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
2459 svc.Spec.Type = v1.ServiceTypeClusterIP
2460 svc.Spec.ClusterIP = "172.16.55.4"
2461 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
2462 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
2463 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somesctp", "SCTP", 1236, 6321, 0)
2464 }),
2465 makeTestService("somewhere-else", "node-port", func(svc *v1.Service) {
2466 svc.Spec.Type = v1.ServiceTypeNodePort
2467 svc.Spec.ClusterIP = "172.16.55.10"
2468 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0)
2469 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0)
2470 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpblah", "SCTP", 343, 676, 0)
2471 }),
2472 makeTestService("somewhere", "load-balancer", func(svc *v1.Service) {
2473 svc.Spec.Type = v1.ServiceTypeLoadBalancer
2474 svc.Spec.ClusterIP = "172.16.55.11"
2475 svc.Spec.LoadBalancerIP = "5.6.7.8"
2476 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
2477 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
2478 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpfoo", "SCTP", 8677, 30063, 7002)
2479 svc.Status.LoadBalancer = v1.LoadBalancerStatus{
2480 Ingress: []v1.LoadBalancerIngress{
2481 {IP: "10.1.2.4"},
2482 },
2483 }
2484 }),
2485 makeTestService("somewhere", "only-local-load-balancer", func(svc *v1.Service) {
2486 svc.Spec.Type = v1.ServiceTypeLoadBalancer
2487 svc.Spec.ClusterIP = "172.16.55.12"
2488 svc.Spec.LoadBalancerIP = "5.6.7.8"
2489 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
2490 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
2491 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpbaz", "SCTP", 8679, 30065, 7004)
2492 svc.Status.LoadBalancer = v1.LoadBalancerStatus{
2493 Ingress: []v1.LoadBalancerIngress{
2494 {IP: "10.1.2.3"},
2495 },
2496 }
2497 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
2498 svc.Spec.HealthCheckNodePort = 345
2499 }),
2500 }
2501
2502 for i := range services {
2503 fp.OnServiceAdd(services[i])
2504 }
2505 result := fp.svcPortMap.Update(fp.serviceChanges)
2506 if len(fp.svcPortMap) != 12 {
2507 t.Errorf("expected service map length 12, got %v", fp.svcPortMap)
2508 }
2509
2510 if len(result.DeletedUDPClusterIPs) != 0 {
2511
2512 t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
2513 }
2514
2515
2516 healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
2517 if len(healthCheckNodePorts) != 1 {
2518 t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts)
2519 } else {
2520 nsn := makeNSN("somewhere", "only-local-load-balancer")
2521 if port, found := healthCheckNodePorts[nsn]; !found || port != 345 {
2522 t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts)
2523 }
2524 }
2525
2526
2527
2528 oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
2529 svc.Spec.Type = v1.ServiceTypeClusterIP
2530 svc.Spec.ClusterIP = "172.16.55.4"
2531 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
2532 })
2533
2534 fp.OnServiceUpdate(services[0], oneService)
2535 fp.OnServiceDelete(services[1])
2536 fp.OnServiceDelete(services[2])
2537 fp.OnServiceDelete(services[3])
2538
2539 result = fp.svcPortMap.Update(fp.serviceChanges)
2540 if len(fp.svcPortMap) != 1 {
2541 t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
2542 }
2543
2544
2545
2546
2547 expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
2548 if len(result.DeletedUDPClusterIPs) != len(expectedStaleUDPServices) {
2549 t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.DeletedUDPClusterIPs.UnsortedList())
2550 }
2551 for _, ip := range expectedStaleUDPServices {
2552 if !result.DeletedUDPClusterIPs.Has(ip) {
2553 t.Errorf("expected stale UDP service service %s", ip)
2554 }
2555 }
2556
2557 healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
2558 if len(healthCheckNodePorts) != 0 {
2559 t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts)
2560 }
2561 }
2562
2563 func TestBuildServiceMapServiceHeadless(t *testing.T) {
2564 ipt := iptablestest.NewFake()
2565 ipvs := ipvstest.NewFake()
2566 ipset := ipsettest.NewFake(testIPSetVersion)
2567 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
2568
2569 makeServiceMap(fp,
2570 makeTestService("somewhere-else", "headless", func(svc *v1.Service) {
2571 svc.Spec.Type = v1.ServiceTypeClusterIP
2572 svc.Spec.ClusterIP = v1.ClusterIPNone
2573 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
2574 }),
2575 makeTestService("somewhere-else", "headless-without-port", func(svc *v1.Service) {
2576 svc.Spec.Type = v1.ServiceTypeClusterIP
2577 svc.Spec.ClusterIP = v1.ClusterIPNone
2578 }),
2579 makeTestService("somewhere-else", "headless-sctp", func(svc *v1.Service) {
2580 svc.Spec.Type = v1.ServiceTypeClusterIP
2581 svc.Spec.ClusterIP = v1.ClusterIPNone
2582 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sip", "SCTP", 1235, 0, 0)
2583 }),
2584 )
2585
2586
2587 result := fp.svcPortMap.Update(fp.serviceChanges)
2588 if len(fp.svcPortMap) != 0 {
2589 t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
2590 }
2591
2592 if len(result.DeletedUDPClusterIPs) != 0 {
2593 t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
2594 }
2595
2596
2597 healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
2598 if len(healthCheckNodePorts) != 0 {
2599 t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts))
2600 }
2601 }
2602
2603 func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
2604 ipt := iptablestest.NewFake()
2605 ipvs := ipvstest.NewFake()
2606 ipset := ipsettest.NewFake(testIPSetVersion)
2607 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
2608
2609 makeServiceMap(fp,
2610 makeTestService("somewhere-else", "external-name", func(svc *v1.Service) {
2611 svc.Spec.Type = v1.ServiceTypeExternalName
2612 svc.Spec.ClusterIP = "172.16.55.4"
2613 svc.Spec.ExternalName = "foo2.bar.com"
2614 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
2615 }),
2616 )
2617
2618 result := fp.svcPortMap.Update(fp.serviceChanges)
2619 if len(fp.svcPortMap) != 0 {
2620 t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
2621 }
2622 if len(result.DeletedUDPClusterIPs) != 0 {
2623 t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs)
2624 }
2625
2626
2627 healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
2628 if len(healthCheckNodePorts) != 0 {
2629 t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
2630 }
2631 }
2632
2633 func TestBuildServiceMapServiceUpdate(t *testing.T) {
2634 ipt := iptablestest.NewFake()
2635 ipvs := ipvstest.NewFake()
2636 ipset := ipsettest.NewFake(testIPSetVersion)
2637 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
2638
2639 servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
2640 svc.Spec.Type = v1.ServiceTypeClusterIP
2641 svc.Spec.ClusterIP = "172.16.55.4"
2642 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
2643 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
2644 })
2645 servicev2 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
2646 svc.Spec.Type = v1.ServiceTypeLoadBalancer
2647 svc.Spec.ClusterIP = "172.16.55.4"
2648 svc.Spec.LoadBalancerIP = "5.6.7.8"
2649 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
2650 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
2651 svc.Status.LoadBalancer = v1.LoadBalancerStatus{
2652 Ingress: []v1.LoadBalancerIngress{
2653 {IP: "10.1.2.3"},
2654 },
2655 }
2656 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
2657 svc.Spec.HealthCheckNodePort = 345
2658 })
2659
2660 fp.OnServiceAdd(servicev1)
2661
2662 result := fp.svcPortMap.Update(fp.serviceChanges)
2663 if len(fp.svcPortMap) != 2 {
2664 t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
2665 }
2666 if len(result.DeletedUDPClusterIPs) != 0 {
2667
2668 t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
2669 }
2670
2671 healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
2672 if len(healthCheckNodePorts) != 0 {
2673 t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
2674 }
2675
2676
2677 fp.OnServiceUpdate(servicev1, servicev2)
2678 result = fp.svcPortMap.Update(fp.serviceChanges)
2679 if len(fp.svcPortMap) != 2 {
2680 t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
2681 }
2682 if len(result.DeletedUDPClusterIPs) != 0 {
2683 t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
2684 }
2685
2686 healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
2687 if len(healthCheckNodePorts) != 1 {
2688 t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
2689 }
2690
2691
2692
2693 fp.OnServiceUpdate(servicev2, servicev2)
2694 result = fp.svcPortMap.Update(fp.serviceChanges)
2695 if len(fp.svcPortMap) != 2 {
2696 t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
2697 }
2698 if len(result.DeletedUDPClusterIPs) != 0 {
2699 t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
2700 }
2701
2702 healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
2703 if len(healthCheckNodePorts) != 1 {
2704 t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
2705 }
2706
2707
2708 fp.OnServiceUpdate(servicev2, servicev1)
2709 result = fp.svcPortMap.Update(fp.serviceChanges)
2710 if len(fp.svcPortMap) != 2 {
2711 t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
2712 }
2713 if len(result.DeletedUDPClusterIPs) != 0 {
2714
2715 t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
2716 }
2717
2718 healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
2719 if len(healthCheckNodePorts) != 0 {
2720 t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
2721 }
2722 }
2723
2724 func TestSessionAffinity(t *testing.T) {
2725 ipt := iptablestest.NewFake()
2726 ipvs := ipvstest.NewFake()
2727 ipset := ipsettest.NewFake(testIPSetVersion)
2728 nodeIP := "100.101.102.103"
2729 fp := NewFakeProxier(ipt, ipvs, ipset, []string{nodeIP}, nil, v1.IPv4Protocol)
2730 svcIP := "10.20.30.41"
2731 svcPort := 80
2732 svcNodePort := 3001
2733 svcExternalIPs := "50.60.70.81"
2734 svcPortName := proxy.ServicePortName{
2735 NamespacedName: makeNSN("ns1", "svc1"),
2736 Port: "p80",
2737 }
2738
2739 makeServiceMap(fp,
2740 makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
2741 svc.Spec.Type = "NodePort"
2742 svc.Spec.ClusterIP = svcIP
2743 svc.Spec.ExternalIPs = []string{svcExternalIPs}
2744 svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
2745 svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{
2746 ClientIP: &v1.ClientIPConfig{
2747 TimeoutSeconds: ptr.To[int32](v1.DefaultClientIPServiceAffinitySeconds),
2748 },
2749 }
2750 svc.Spec.Ports = []v1.ServicePort{{
2751 Name: svcPortName.Port,
2752 Port: int32(svcPort),
2753 Protocol: v1.ProtocolTCP,
2754 NodePort: int32(svcNodePort),
2755 }}
2756 }),
2757 )
2758 fp.syncProxyRules()
2759
2760
2761 services, err := ipvs.GetVirtualServers()
2762 if err != nil {
2763 t.Errorf("Failed to get ipvs services, err: %v", err)
2764 }
2765 for _, svc := range services {
2766 if svc.Timeout != uint32(v1.DefaultClientIPServiceAffinitySeconds) {
2767 t.Errorf("Unexpected mismatch ipvs service session affinity timeout: %d, expected: %d", svc.Timeout, v1.DefaultClientIPServiceAffinitySeconds)
2768 }
2769 }
2770 }
2771
2772 func makeServicePortName(ns, name, port string, protocol v1.Protocol) proxy.ServicePortName {
2773 return proxy.ServicePortName{
2774 NamespacedName: makeNSN(ns, name),
2775 Port: port,
2776 Protocol: protocol,
2777 }
2778 }
2779
2780 func Test_updateEndpointsMap(t *testing.T) {
2781 emptyEndpointSlices := []*discovery.EndpointSlice{
2782 makeTestEndpointSlice("ns1", "ep1", 1, func(*discovery.EndpointSlice) {}),
2783 }
2784 subset1 := func(eps *discovery.EndpointSlice) {
2785 eps.AddressType = discovery.AddressTypeIPv4
2786 eps.Endpoints = []discovery.Endpoint{{
2787 Addresses: []string{"1.1.1.1"},
2788 }}
2789 eps.Ports = []discovery.EndpointPort{{
2790 Name: ptr.To("p11"),
2791 Port: ptr.To[int32](11),
2792 Protocol: ptr.To(v1.ProtocolUDP),
2793 }}
2794 }
2795 subset2 := func(eps *discovery.EndpointSlice) {
2796 eps.AddressType = discovery.AddressTypeIPv4
2797 eps.Endpoints = []discovery.Endpoint{{
2798 Addresses: []string{"1.1.1.2"},
2799 }}
2800 eps.Ports = []discovery.EndpointPort{{
2801 Name: ptr.To("p12"),
2802 Port: ptr.To[int32](12),
2803 Protocol: ptr.To(v1.ProtocolUDP),
2804 }}
2805 }
2806 namedPortLocal := []*discovery.EndpointSlice{
2807 makeTestEndpointSlice("ns1", "ep1", 1,
2808 func(eps *discovery.EndpointSlice) {
2809 eps.AddressType = discovery.AddressTypeIPv4
2810 eps.Endpoints = []discovery.Endpoint{{
2811 Addresses: []string{"1.1.1.1"},
2812 NodeName: ptr.To(testHostname),
2813 }}
2814 eps.Ports = []discovery.EndpointPort{{
2815 Name: ptr.To("p11"),
2816 Port: ptr.To[int32](11),
2817 Protocol: ptr.To(v1.ProtocolUDP),
2818 }}
2819 }),
2820 }
2821 namedPort := []*discovery.EndpointSlice{
2822 makeTestEndpointSlice("ns1", "ep1", 1, subset1),
2823 }
2824 namedPortRenamed := []*discovery.EndpointSlice{
2825 makeTestEndpointSlice("ns1", "ep1", 1,
2826 func(eps *discovery.EndpointSlice) {
2827 eps.AddressType = discovery.AddressTypeIPv4
2828 eps.Endpoints = []discovery.Endpoint{{
2829 Addresses: []string{"1.1.1.1"},
2830 }}
2831 eps.Ports = []discovery.EndpointPort{{
2832 Name: ptr.To("p11-2"),
2833 Port: ptr.To[int32](11),
2834 Protocol: ptr.To(v1.ProtocolUDP),
2835 }}
2836 }),
2837 }
2838 namedPortRenumbered := []*discovery.EndpointSlice{
2839 makeTestEndpointSlice("ns1", "ep1", 1,
2840 func(eps *discovery.EndpointSlice) {
2841 eps.AddressType = discovery.AddressTypeIPv4
2842 eps.Endpoints = []discovery.Endpoint{{
2843 Addresses: []string{"1.1.1.1"},
2844 }}
2845 eps.Ports = []discovery.EndpointPort{{
2846 Name: ptr.To("p11"),
2847 Port: ptr.To[int32](22),
2848 Protocol: ptr.To(v1.ProtocolUDP),
2849 }}
2850 }),
2851 }
2852 namedPortsLocalNoLocal := []*discovery.EndpointSlice{
2853 makeTestEndpointSlice("ns1", "ep1", 1,
2854 func(eps *discovery.EndpointSlice) {
2855 eps.AddressType = discovery.AddressTypeIPv4
2856 eps.Endpoints = []discovery.Endpoint{{
2857 Addresses: []string{"1.1.1.1"},
2858 }, {
2859 Addresses: []string{"1.1.1.2"},
2860 NodeName: ptr.To(testHostname),
2861 }}
2862 eps.Ports = []discovery.EndpointPort{{
2863 Name: ptr.To("p11"),
2864 Port: ptr.To[int32](11),
2865 Protocol: ptr.To(v1.ProtocolUDP),
2866 }, {
2867 Name: ptr.To("p12"),
2868 Port: ptr.To[int32](12),
2869 Protocol: ptr.To(v1.ProtocolUDP),
2870 }}
2871 }),
2872 }
2873 multipleSubsets := []*discovery.EndpointSlice{
2874 makeTestEndpointSlice("ns1", "ep1", 1, subset1),
2875 makeTestEndpointSlice("ns1", "ep1", 2, subset2),
2876 }
2877 subsetLocal := func(eps *discovery.EndpointSlice) {
2878 eps.AddressType = discovery.AddressTypeIPv4
2879 eps.Endpoints = []discovery.Endpoint{{
2880 Addresses: []string{"1.1.1.2"},
2881 NodeName: ptr.To(testHostname),
2882 }}
2883 eps.Ports = []discovery.EndpointPort{{
2884 Name: ptr.To("p12"),
2885 Port: ptr.To[int32](12),
2886 Protocol: ptr.To(v1.ProtocolUDP),
2887 }}
2888 }
2889 multipleSubsetsWithLocal := []*discovery.EndpointSlice{
2890 makeTestEndpointSlice("ns1", "ep1", 1, subset1),
2891 makeTestEndpointSlice("ns1", "ep1", 2, subsetLocal),
2892 }
2893 subsetMultiplePortsLocal := func(eps *discovery.EndpointSlice) {
2894 eps.AddressType = discovery.AddressTypeIPv4
2895 eps.Endpoints = []discovery.Endpoint{{
2896 Addresses: []string{"1.1.1.1"},
2897 NodeName: ptr.To(testHostname),
2898 }}
2899 eps.Ports = []discovery.EndpointPort{{
2900 Name: ptr.To("p11"),
2901 Port: ptr.To[int32](11),
2902 Protocol: ptr.To(v1.ProtocolUDP),
2903 }, {
2904 Name: ptr.To("p12"),
2905 Port: ptr.To[int32](12),
2906 Protocol: ptr.To(v1.ProtocolUDP),
2907 }}
2908 }
2909 subset3 := func(eps *discovery.EndpointSlice) {
2910 eps.AddressType = discovery.AddressTypeIPv4
2911 eps.Endpoints = []discovery.Endpoint{{
2912 Addresses: []string{"1.1.1.3"},
2913 }}
2914 eps.Ports = []discovery.EndpointPort{{
2915 Name: ptr.To("p13"),
2916 Port: ptr.To[int32](13),
2917 Protocol: ptr.To(v1.ProtocolUDP),
2918 }}
2919 }
2920 multipleSubsetsMultiplePortsLocal := []*discovery.EndpointSlice{
2921 makeTestEndpointSlice("ns1", "ep1", 1, subsetMultiplePortsLocal),
2922 makeTestEndpointSlice("ns1", "ep1", 2, subset3),
2923 }
2924 subsetMultipleIPsPorts1 := func(eps *discovery.EndpointSlice) {
2925 eps.AddressType = discovery.AddressTypeIPv4
2926 eps.Endpoints = []discovery.Endpoint{{
2927 Addresses: []string{"1.1.1.1"},
2928 }, {
2929 Addresses: []string{"1.1.1.2"},
2930 NodeName: ptr.To(testHostname),
2931 }}
2932 eps.Ports = []discovery.EndpointPort{{
2933 Name: ptr.To("p11"),
2934 Port: ptr.To[int32](11),
2935 Protocol: ptr.To(v1.ProtocolUDP),
2936 }, {
2937 Name: ptr.To("p12"),
2938 Port: ptr.To[int32](12),
2939 Protocol: ptr.To(v1.ProtocolUDP),
2940 }}
2941 }
2942 subsetMultipleIPsPorts2 := func(eps *discovery.EndpointSlice) {
2943 eps.AddressType = discovery.AddressTypeIPv4
2944 eps.Endpoints = []discovery.Endpoint{{
2945 Addresses: []string{"1.1.1.3"},
2946 }, {
2947 Addresses: []string{"1.1.1.4"},
2948 NodeName: ptr.To(testHostname),
2949 }}
2950 eps.Ports = []discovery.EndpointPort{{
2951 Name: ptr.To("p13"),
2952 Port: ptr.To[int32](13),
2953 Protocol: ptr.To(v1.ProtocolUDP),
2954 }, {
2955 Name: ptr.To("p14"),
2956 Port: ptr.To[int32](14),
2957 Protocol: ptr.To(v1.ProtocolUDP),
2958 }}
2959 }
2960 subsetMultipleIPsPorts3 := func(eps *discovery.EndpointSlice) {
2961 eps.AddressType = discovery.AddressTypeIPv4
2962 eps.Endpoints = []discovery.Endpoint{{
2963 Addresses: []string{"2.2.2.1"},
2964 }, {
2965 Addresses: []string{"2.2.2.2"},
2966 NodeName: ptr.To(testHostname),
2967 }}
2968 eps.Ports = []discovery.EndpointPort{{
2969 Name: ptr.To("p21"),
2970 Port: ptr.To[int32](21),
2971 Protocol: ptr.To(v1.ProtocolUDP),
2972 }, {
2973 Name: ptr.To("p22"),
2974 Port: ptr.To[int32](22),
2975 Protocol: ptr.To(v1.ProtocolUDP),
2976 }}
2977 }
2978 multipleSubsetsIPsPorts := []*discovery.EndpointSlice{
2979 makeTestEndpointSlice("ns1", "ep1", 1, subsetMultipleIPsPorts1),
2980 makeTestEndpointSlice("ns1", "ep1", 2, subsetMultipleIPsPorts2),
2981 makeTestEndpointSlice("ns2", "ep2", 1, subsetMultipleIPsPorts3),
2982 }
2983 complexSubset1 := func(eps *discovery.EndpointSlice) {
2984 eps.AddressType = discovery.AddressTypeIPv4
2985 eps.Endpoints = []discovery.Endpoint{{
2986 Addresses: []string{"2.2.2.2"},
2987 NodeName: ptr.To(testHostname),
2988 }, {
2989 Addresses: []string{"2.2.2.22"},
2990 NodeName: ptr.To(testHostname),
2991 }}
2992 eps.Ports = []discovery.EndpointPort{{
2993 Name: ptr.To("p22"),
2994 Port: ptr.To[int32](22),
2995 Protocol: ptr.To(v1.ProtocolUDP),
2996 }}
2997 }
2998 complexSubset2 := func(eps *discovery.EndpointSlice) {
2999 eps.AddressType = discovery.AddressTypeIPv4
3000 eps.Endpoints = []discovery.Endpoint{{
3001 Addresses: []string{"2.2.2.3"},
3002 NodeName: ptr.To(testHostname),
3003 }}
3004 eps.Ports = []discovery.EndpointPort{{
3005 Name: ptr.To("p23"),
3006 Port: ptr.To[int32](23),
3007 Protocol: ptr.To(v1.ProtocolUDP),
3008 }}
3009 }
3010 complexSubset3 := func(eps *discovery.EndpointSlice) {
3011 eps.AddressType = discovery.AddressTypeIPv4
3012 eps.Endpoints = []discovery.Endpoint{{
3013 Addresses: []string{"4.4.4.4"},
3014 NodeName: ptr.To(testHostname),
3015 }, {
3016 Addresses: []string{"4.4.4.5"},
3017 NodeName: ptr.To(testHostname),
3018 }}
3019 eps.Ports = []discovery.EndpointPort{{
3020 Name: ptr.To("p44"),
3021 Port: ptr.To[int32](44),
3022 Protocol: ptr.To(v1.ProtocolUDP),
3023 }}
3024 }
3025 complexSubset4 := func(eps *discovery.EndpointSlice) {
3026 eps.AddressType = discovery.AddressTypeIPv4
3027 eps.Endpoints = []discovery.Endpoint{{
3028 Addresses: []string{"4.4.4.6"},
3029 NodeName: ptr.To(testHostname),
3030 }}
3031 eps.Ports = []discovery.EndpointPort{{
3032 Name: ptr.To("p45"),
3033 Port: ptr.To[int32](45),
3034 Protocol: ptr.To(v1.ProtocolUDP),
3035 }}
3036 }
3037 complexSubset5 := func(eps *discovery.EndpointSlice) {
3038 eps.AddressType = discovery.AddressTypeIPv4
3039 eps.Endpoints = []discovery.Endpoint{{
3040 Addresses: []string{"1.1.1.1"},
3041 }, {
3042 Addresses: []string{"1.1.1.11"},
3043 }}
3044 eps.Ports = []discovery.EndpointPort{{
3045 Name: ptr.To("p11"),
3046 Port: ptr.To[int32](11),
3047 Protocol: ptr.To(v1.ProtocolUDP),
3048 }}
3049 }
3050 complexSubset6 := func(eps *discovery.EndpointSlice) {
3051 eps.AddressType = discovery.AddressTypeIPv4
3052 eps.Endpoints = []discovery.Endpoint{{
3053 Addresses: []string{"1.1.1.2"},
3054 }}
3055 eps.Ports = []discovery.EndpointPort{{
3056 Name: ptr.To("p12"),
3057 Port: ptr.To[int32](12),
3058 Protocol: ptr.To(v1.ProtocolUDP),
3059 }, {
3060 Name: ptr.To("p122"),
3061 Port: ptr.To[int32](122),
3062 Protocol: ptr.To(v1.ProtocolUDP),
3063 }}
3064 }
3065 complexSubset7 := func(eps *discovery.EndpointSlice) {
3066 eps.AddressType = discovery.AddressTypeIPv4
3067 eps.Endpoints = []discovery.Endpoint{{
3068 Addresses: []string{"3.3.3.3"},
3069 }}
3070 eps.Ports = []discovery.EndpointPort{{
3071 Name: ptr.To("p33"),
3072 Port: ptr.To[int32](33),
3073 Protocol: ptr.To(v1.ProtocolUDP),
3074 }}
3075 }
3076 complexSubset8 := func(eps *discovery.EndpointSlice) {
3077 eps.AddressType = discovery.AddressTypeIPv4
3078 eps.Endpoints = []discovery.Endpoint{{
3079 Addresses: []string{"4.4.4.4"},
3080 NodeName: ptr.To(testHostname),
3081 }}
3082 eps.Ports = []discovery.EndpointPort{{
3083 Name: ptr.To("p44"),
3084 Port: ptr.To[int32](44),
3085 Protocol: ptr.To(v1.ProtocolUDP),
3086 }}
3087 }
3088 complexBefore := []*discovery.EndpointSlice{
3089 makeTestEndpointSlice("ns1", "ep1", 1, subset1),
3090 nil,
3091 makeTestEndpointSlice("ns2", "ep2", 1, complexSubset1),
3092 makeTestEndpointSlice("ns2", "ep2", 2, complexSubset2),
3093 nil,
3094 makeTestEndpointSlice("ns4", "ep4", 1, complexSubset3),
3095 makeTestEndpointSlice("ns4", "ep4", 2, complexSubset4),
3096 }
3097 complexAfter := []*discovery.EndpointSlice{
3098 makeTestEndpointSlice("ns1", "ep1", 1, complexSubset5),
3099 makeTestEndpointSlice("ns1", "ep1", 2, complexSubset6),
3100 nil,
3101 nil,
3102 makeTestEndpointSlice("ns3", "ep3", 1, complexSubset7),
3103 makeTestEndpointSlice("ns4", "ep4", 1, complexSubset8),
3104 nil,
3105 }
3106
3107 testCases := []struct {
3108
3109
3110
3111 name string
3112 previousEndpoints []*discovery.EndpointSlice
3113 currentEndpoints []*discovery.EndpointSlice
3114 oldEndpoints map[proxy.ServicePortName][]endpointExpectation
3115 expectedResult map[proxy.ServicePortName][]endpointExpectation
3116 expectedDeletedUDPEndpoints []proxy.ServiceEndpoint
3117 expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool
3118 expectedReadyEndpoints map[types.NamespacedName]int
3119 }{{
3120
3121 name: "nothing",
3122 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{},
3123 expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
3124 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
3125 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
3126 expectedReadyEndpoints: map[types.NamespacedName]int{},
3127 }, {
3128
3129 name: "no change, named port, local",
3130 previousEndpoints: namedPortLocal,
3131 currentEndpoints: namedPortLocal,
3132 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3133 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3134 {endpoint: "1.1.1.1:11", isLocal: true},
3135 },
3136 },
3137 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3138 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3139 {endpoint: "1.1.1.1:11", isLocal: true},
3140 },
3141 },
3142 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
3143 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
3144 expectedReadyEndpoints: map[types.NamespacedName]int{
3145 makeNSN("ns1", "ep1"): 1,
3146 },
3147 }, {
3148
3149 name: "no change, multiple subsets",
3150 previousEndpoints: multipleSubsets,
3151 currentEndpoints: multipleSubsets,
3152 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3153 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3154 {endpoint: "1.1.1.1:11", isLocal: false},
3155 },
3156 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
3157 {endpoint: "1.1.1.2:12", isLocal: false},
3158 },
3159 },
3160 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3161 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3162 {endpoint: "1.1.1.1:11", isLocal: false},
3163 },
3164 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
3165 {endpoint: "1.1.1.2:12", isLocal: false},
3166 },
3167 },
3168 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
3169 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
3170 expectedReadyEndpoints: map[types.NamespacedName]int{},
3171 }, {
3172
3173 name: "no change, multiple subsets, multiple ports, local",
3174 previousEndpoints: multipleSubsetsMultiplePortsLocal,
3175 currentEndpoints: multipleSubsetsMultiplePortsLocal,
3176 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3177 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3178 {endpoint: "1.1.1.1:11", isLocal: true},
3179 },
3180 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
3181 {endpoint: "1.1.1.1:12", isLocal: true},
3182 },
3183 makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
3184 {endpoint: "1.1.1.3:13", isLocal: false},
3185 },
3186 },
3187 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3188 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3189 {endpoint: "1.1.1.1:11", isLocal: true},
3190 },
3191 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
3192 {endpoint: "1.1.1.1:12", isLocal: true},
3193 },
3194 makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
3195 {endpoint: "1.1.1.3:13", isLocal: false},
3196 },
3197 },
3198 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
3199 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
3200 expectedReadyEndpoints: map[types.NamespacedName]int{
3201 makeNSN("ns1", "ep1"): 1,
3202 },
3203 }, {
3204
3205 name: "no change, multiple endpoints, subsets, IPs, and ports",
3206 previousEndpoints: multipleSubsetsIPsPorts,
3207 currentEndpoints: multipleSubsetsIPsPorts,
3208 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3209 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3210 {endpoint: "1.1.1.1:11", isLocal: false},
3211 {endpoint: "1.1.1.2:11", isLocal: true},
3212 },
3213 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
3214 {endpoint: "1.1.1.1:12", isLocal: false},
3215 {endpoint: "1.1.1.2:12", isLocal: true},
3216 },
3217 makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
3218 {endpoint: "1.1.1.3:13", isLocal: false},
3219 {endpoint: "1.1.1.4:13", isLocal: true},
3220 },
3221 makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): {
3222 {endpoint: "1.1.1.3:14", isLocal: false},
3223 {endpoint: "1.1.1.4:14", isLocal: true},
3224 },
3225 makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): {
3226 {endpoint: "2.2.2.1:21", isLocal: false},
3227 {endpoint: "2.2.2.2:21", isLocal: true},
3228 },
3229 makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
3230 {endpoint: "2.2.2.1:22", isLocal: false},
3231 {endpoint: "2.2.2.2:22", isLocal: true},
3232 },
3233 },
3234 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3235 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3236 {endpoint: "1.1.1.1:11", isLocal: false},
3237 {endpoint: "1.1.1.2:11", isLocal: true},
3238 },
3239 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
3240 {endpoint: "1.1.1.1:12", isLocal: false},
3241 {endpoint: "1.1.1.2:12", isLocal: true},
3242 },
3243 makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
3244 {endpoint: "1.1.1.3:13", isLocal: false},
3245 {endpoint: "1.1.1.4:13", isLocal: true},
3246 },
3247 makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): {
3248 {endpoint: "1.1.1.3:14", isLocal: false},
3249 {endpoint: "1.1.1.4:14", isLocal: true},
3250 },
3251 makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): {
3252 {endpoint: "2.2.2.1:21", isLocal: false},
3253 {endpoint: "2.2.2.2:21", isLocal: true},
3254 },
3255 makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
3256 {endpoint: "2.2.2.1:22", isLocal: false},
3257 {endpoint: "2.2.2.2:22", isLocal: true},
3258 },
3259 },
3260 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
3261 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
3262 expectedReadyEndpoints: map[types.NamespacedName]int{
3263 makeNSN("ns1", "ep1"): 2,
3264 makeNSN("ns2", "ep2"): 1,
3265 },
3266 }, {
3267
3268 name: "add an Endpoints",
3269 previousEndpoints: []*discovery.EndpointSlice{nil},
3270 currentEndpoints: namedPortLocal,
3271 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{},
3272 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3273 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3274 {endpoint: "1.1.1.1:11", isLocal: true},
3275 },
3276 },
3277 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
3278 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
3279 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
3280 },
3281 expectedReadyEndpoints: map[types.NamespacedName]int{
3282 makeNSN("ns1", "ep1"): 1,
3283 },
3284 }, {
3285
3286 name: "remove an Endpoints",
3287 previousEndpoints: namedPortLocal,
3288 currentEndpoints: []*discovery.EndpointSlice{nil},
3289 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3290 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3291 {endpoint: "1.1.1.1:11", isLocal: true},
3292 },
3293 },
3294 expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
3295 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
3296 Endpoint: "1.1.1.1:11",
3297 ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
3298 }},
3299 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
3300 expectedReadyEndpoints: map[types.NamespacedName]int{},
3301 }, {
3302
3303 name: "add an IP and port",
3304 previousEndpoints: namedPort,
3305 currentEndpoints: namedPortsLocalNoLocal,
3306 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3307 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3308 {endpoint: "1.1.1.1:11", isLocal: false},
3309 },
3310 },
3311 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3312 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3313 {endpoint: "1.1.1.1:11", isLocal: false},
3314 {endpoint: "1.1.1.2:11", isLocal: true},
3315 },
3316 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
3317 {endpoint: "1.1.1.1:12", isLocal: false},
3318 {endpoint: "1.1.1.2:12", isLocal: true},
3319 },
3320 },
3321 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
3322 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
3323 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
3324 },
3325 expectedReadyEndpoints: map[types.NamespacedName]int{
3326 makeNSN("ns1", "ep1"): 1,
3327 },
3328 }, {
3329
3330 name: "remove an IP and port",
3331 previousEndpoints: namedPortsLocalNoLocal,
3332 currentEndpoints: namedPort,
3333 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3334 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3335 {endpoint: "1.1.1.1:11", isLocal: false},
3336 {endpoint: "1.1.1.2:11", isLocal: true},
3337 },
3338 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
3339 {endpoint: "1.1.1.1:12", isLocal: false},
3340 {endpoint: "1.1.1.2:12", isLocal: true},
3341 },
3342 },
3343 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3344 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3345 {endpoint: "1.1.1.1:11", isLocal: false},
3346 },
3347 },
3348 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
3349 Endpoint: "1.1.1.2:11",
3350 ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
3351 }, {
3352 Endpoint: "1.1.1.1:12",
3353 ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
3354 }, {
3355 Endpoint: "1.1.1.2:12",
3356 ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
3357 }},
3358 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
3359 expectedReadyEndpoints: map[types.NamespacedName]int{},
3360 }, {
3361
3362 name: "add a subset",
3363 previousEndpoints: []*discovery.EndpointSlice{namedPort[0], nil},
3364 currentEndpoints: multipleSubsetsWithLocal,
3365 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3366 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3367 {endpoint: "1.1.1.1:11", isLocal: false},
3368 },
3369 },
3370 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3371 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3372 {endpoint: "1.1.1.1:11", isLocal: false},
3373 },
3374 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
3375 {endpoint: "1.1.1.2:12", isLocal: true},
3376 },
3377 },
3378 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
3379 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
3380 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
3381 },
3382 expectedReadyEndpoints: map[types.NamespacedName]int{
3383 makeNSN("ns1", "ep1"): 1,
3384 },
3385 }, {
3386
3387 name: "remove a subset",
3388 previousEndpoints: multipleSubsets,
3389 currentEndpoints: []*discovery.EndpointSlice{namedPort[0], nil},
3390 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3391 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3392 {endpoint: "1.1.1.1:11", isLocal: false},
3393 },
3394 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
3395 {endpoint: "1.1.1.2:12", isLocal: false},
3396 },
3397 },
3398 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3399 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3400 {endpoint: "1.1.1.1:11", isLocal: false},
3401 },
3402 },
3403 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
3404 Endpoint: "1.1.1.2:12",
3405 ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
3406 }},
3407 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
3408 expectedReadyEndpoints: map[types.NamespacedName]int{},
3409 }, {
3410
3411 name: "rename a port",
3412 previousEndpoints: namedPort,
3413 currentEndpoints: namedPortRenamed,
3414 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3415 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3416 {endpoint: "1.1.1.1:11", isLocal: false},
3417 },
3418 },
3419 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3420 makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): {
3421 {endpoint: "1.1.1.1:11", isLocal: false},
3422 },
3423 },
3424 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
3425 Endpoint: "1.1.1.1:11",
3426 ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
3427 }},
3428 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
3429 makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
3430 },
3431 expectedReadyEndpoints: map[types.NamespacedName]int{},
3432 }, {
3433
3434 name: "renumber a port",
3435 previousEndpoints: namedPort,
3436 currentEndpoints: namedPortRenumbered,
3437 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3438 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3439 {endpoint: "1.1.1.1:11", isLocal: false},
3440 },
3441 },
3442 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3443 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3444 {endpoint: "1.1.1.1:22", isLocal: false},
3445 },
3446 },
3447 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
3448 Endpoint: "1.1.1.1:11",
3449 ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
3450 }},
3451 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
3452 expectedReadyEndpoints: map[types.NamespacedName]int{},
3453 }, {
3454
3455 name: "complex add and remove",
3456 previousEndpoints: complexBefore,
3457 currentEndpoints: complexAfter,
3458 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{
3459 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3460 {endpoint: "1.1.1.1:11", isLocal: false},
3461 },
3462 makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
3463 {endpoint: "2.2.2.22:22", isLocal: true},
3464 {endpoint: "2.2.2.2:22", isLocal: true},
3465 },
3466 makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP): {
3467 {endpoint: "2.2.2.3:23", isLocal: true},
3468 },
3469 makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): {
3470 {endpoint: "4.4.4.4:44", isLocal: true},
3471 {endpoint: "4.4.4.5:44", isLocal: true},
3472 },
3473 makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP): {
3474 {endpoint: "4.4.4.6:45", isLocal: true},
3475 },
3476 },
3477 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3478 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3479 {endpoint: "1.1.1.11:11", isLocal: false},
3480 {endpoint: "1.1.1.1:11", isLocal: false},
3481 },
3482 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
3483 {endpoint: "1.1.1.2:12", isLocal: false},
3484 },
3485 makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): {
3486 {endpoint: "1.1.1.2:122", isLocal: false},
3487 },
3488 makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): {
3489 {endpoint: "3.3.3.3:33", isLocal: false},
3490 },
3491 makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): {
3492 {endpoint: "4.4.4.4:44", isLocal: true},
3493 },
3494 },
3495 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
3496 Endpoint: "2.2.2.2:22",
3497 ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
3498 }, {
3499 Endpoint: "2.2.2.22:22",
3500 ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
3501 }, {
3502 Endpoint: "2.2.2.3:23",
3503 ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP),
3504 }, {
3505 Endpoint: "4.4.4.5:44",
3506 ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP),
3507 }, {
3508 Endpoint: "4.4.4.6:45",
3509 ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
3510 }},
3511 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
3512 makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
3513 makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
3514 makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
3515 },
3516 expectedReadyEndpoints: map[types.NamespacedName]int{
3517 makeNSN("ns4", "ep4"): 1,
3518 },
3519 }, {
3520
3521 name: "change from 0 endpoint address to 1 named port",
3522 previousEndpoints: emptyEndpointSlices,
3523 currentEndpoints: namedPort,
3524 oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{},
3525 expectedResult: map[proxy.ServicePortName][]endpointExpectation{
3526 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
3527 {endpoint: "1.1.1.1:11", isLocal: false},
3528 },
3529 },
3530 expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
3531 expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
3532 makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
3533 },
3534 expectedReadyEndpoints: map[types.NamespacedName]int{},
3535 },
3536 }
3537
3538 for tci, tc := range testCases {
3539 t.Run(tc.name, func(t *testing.T) {
3540 ipt := iptablestest.NewFake()
3541 ipvs := ipvstest.NewFake()
3542 ipset := ipsettest.NewFake(testIPSetVersion)
3543 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
3544 fp.hostname = testHostname
3545
3546
3547
3548 for i := range tc.previousEndpoints {
3549 if tc.previousEndpoints[i] != nil {
3550 fp.OnEndpointSliceAdd(tc.previousEndpoints[i])
3551 }
3552 }
3553 fp.endpointsMap.Update(fp.endpointsChanges)
3554 checkEndpointExpectations(t, tci, fp.endpointsMap, tc.oldEndpoints)
3555
3556
3557 if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
3558 t.Fatalf("[%d] different lengths of previous and current endpoints", tci)
3559 }
3560
3561 for i := range tc.previousEndpoints {
3562 prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i]
3563 switch {
3564 case prev == nil:
3565 fp.OnEndpointSliceAdd(curr)
3566 case curr == nil:
3567 fp.OnEndpointSliceDelete(prev)
3568 default:
3569 fp.OnEndpointSliceUpdate(prev, curr)
3570 }
3571 }
3572 result := fp.endpointsMap.Update(fp.endpointsChanges)
3573 newMap := fp.endpointsMap
3574 checkEndpointExpectations(t, tci, newMap, tc.expectedResult)
3575 if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
3576 t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
3577 }
3578 for _, x := range tc.expectedDeletedUDPEndpoints {
3579 found := false
3580 for _, stale := range result.DeletedUDPEndpoints {
3581 if stale == x {
3582 found = true
3583 break
3584 }
3585 }
3586 if !found {
3587 t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints)
3588 }
3589 }
3590 if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) {
3591 t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices)
3592 }
3593 for svcName := range tc.expectedNewlyActiveUDPServices {
3594 found := false
3595 for _, stale := range result.NewlyActiveUDPServices {
3596 if stale == svcName {
3597 found = true
3598 break
3599 }
3600 }
3601 if !found {
3602 t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices)
3603 }
3604 }
3605 localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
3606 if !reflect.DeepEqual(localReadyEndpoints, tc.expectedReadyEndpoints) {
3607 t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedReadyEndpoints, localReadyEndpoints)
3608 }
3609 })
3610 }
3611 }
3612
3613 type endpointExpectation struct {
3614 endpoint string
3615 isLocal bool
3616 }
3617
3618 func checkEndpointExpectations(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]endpointExpectation) {
3619 if len(newMap) != len(expected) {
3620 t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
3621 }
3622 for x := range expected {
3623 if len(newMap[x]) != len(expected[x]) {
3624 t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x]))
3625 } else {
3626 for i := range expected[x] {
3627 newEp := newMap[x][i]
3628 if newEp.String() != expected[x][i].endpoint ||
3629 newEp.IsLocal() != expected[x][i].isLocal {
3630 t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
3631 }
3632 }
3633 }
3634 }
3635 }
3636
3637 func Test_syncService(t *testing.T) {
3638 testCases := []struct {
3639 oldVirtualServer *utilipvs.VirtualServer
3640 svcName string
3641 newVirtualServer *utilipvs.VirtualServer
3642 bindAddr bool
3643 alreadyBoundAddrs sets.Set[string]
3644 }{
3645 {
3646
3647 oldVirtualServer: &utilipvs.VirtualServer{
3648 Address: netutils.ParseIPSloppy("1.2.3.4"),
3649 Protocol: string(v1.ProtocolTCP),
3650 Port: 80,
3651 Scheduler: "rr",
3652 Flags: utilipvs.FlagHashed,
3653 },
3654 svcName: "foo",
3655 newVirtualServer: &utilipvs.VirtualServer{
3656 Address: netutils.ParseIPSloppy("1.2.3.4"),
3657 Protocol: string(v1.ProtocolTCP),
3658 Port: 80,
3659 Scheduler: "rr",
3660 Flags: utilipvs.FlagHashed,
3661 },
3662 bindAddr: false,
3663 alreadyBoundAddrs: nil,
3664 },
3665 {
3666
3667 oldVirtualServer: &utilipvs.VirtualServer{
3668 Address: netutils.ParseIPSloppy("1.2.3.4"),
3669 Protocol: string(v1.ProtocolTCP),
3670 Port: 8080,
3671 Scheduler: "rr",
3672 Flags: utilipvs.FlagHashed,
3673 },
3674 svcName: "bar",
3675 newVirtualServer: &utilipvs.VirtualServer{
3676 Address: netutils.ParseIPSloppy("1.2.3.4"),
3677 Protocol: string(v1.ProtocolTCP),
3678 Port: 8080,
3679 Scheduler: "rr",
3680 Flags: utilipvs.FlagPersistent,
3681 },
3682 bindAddr: false,
3683 alreadyBoundAddrs: nil,
3684 },
3685 {
3686
3687 oldVirtualServer: &utilipvs.VirtualServer{
3688 Address: netutils.ParseIPSloppy("1.2.3.4"),
3689 Protocol: string(v1.ProtocolTCP),
3690 Port: 8080,
3691 Scheduler: "rr",
3692 Flags: utilipvs.FlagHashed,
3693 },
3694 svcName: "bar",
3695 newVirtualServer: &utilipvs.VirtualServer{
3696 Address: netutils.ParseIPSloppy("1.2.3.4"),
3697 Protocol: string(v1.ProtocolTCP),
3698 Port: 8080,
3699 Scheduler: "wlc",
3700 Flags: utilipvs.FlagHashed,
3701 },
3702 bindAddr: false,
3703 alreadyBoundAddrs: nil,
3704 },
3705 {
3706
3707 oldVirtualServer: nil,
3708 svcName: "baz",
3709 newVirtualServer: &utilipvs.VirtualServer{
3710 Address: netutils.ParseIPSloppy("1.2.3.4"),
3711 Protocol: string(v1.ProtocolUDP),
3712 Port: 53,
3713 Scheduler: "rr",
3714 Flags: utilipvs.FlagHashed,
3715 },
3716 bindAddr: true,
3717 alreadyBoundAddrs: nil,
3718 },
3719 {
3720
3721 oldVirtualServer: &utilipvs.VirtualServer{
3722 Address: netutils.ParseIPSloppy("1.2.3.4"),
3723 Protocol: string(v1.ProtocolSCTP),
3724 Port: 80,
3725 Scheduler: "rr",
3726 Flags: utilipvs.FlagHashed,
3727 },
3728 svcName: "foo",
3729 newVirtualServer: &utilipvs.VirtualServer{
3730 Address: netutils.ParseIPSloppy("1.2.3.4"),
3731 Protocol: string(v1.ProtocolSCTP),
3732 Port: 80,
3733 Scheduler: "rr",
3734 Flags: utilipvs.FlagHashed,
3735 },
3736 bindAddr: false,
3737 alreadyBoundAddrs: nil,
3738 },
3739 {
3740
3741 oldVirtualServer: &utilipvs.VirtualServer{
3742 Address: netutils.ParseIPSloppy("1.2.3.4"),
3743 Protocol: string(v1.ProtocolSCTP),
3744 Port: 8080,
3745 Scheduler: "rr",
3746 Flags: utilipvs.FlagHashed,
3747 },
3748 svcName: "bar",
3749 newVirtualServer: &utilipvs.VirtualServer{
3750 Address: netutils.ParseIPSloppy("1.2.3.4"),
3751 Protocol: string(v1.ProtocolSCTP),
3752 Port: 8080,
3753 Scheduler: "rr",
3754 Flags: utilipvs.FlagPersistent,
3755 },
3756 bindAddr: false,
3757 alreadyBoundAddrs: nil,
3758 },
3759 {
3760
3761 oldVirtualServer: &utilipvs.VirtualServer{
3762 Address: netutils.ParseIPSloppy("1.2.3.4"),
3763 Protocol: string(v1.ProtocolSCTP),
3764 Port: 8080,
3765 Scheduler: "rr",
3766 Flags: utilipvs.FlagHashed,
3767 },
3768 svcName: "bar",
3769 newVirtualServer: &utilipvs.VirtualServer{
3770 Address: netutils.ParseIPSloppy("1.2.3.4"),
3771 Protocol: string(v1.ProtocolSCTP),
3772 Port: 8080,
3773 Scheduler: "wlc",
3774 Flags: utilipvs.FlagHashed,
3775 },
3776 bindAddr: false,
3777 alreadyBoundAddrs: nil,
3778 },
3779 {
3780
3781 oldVirtualServer: nil,
3782 svcName: "baz",
3783 newVirtualServer: &utilipvs.VirtualServer{
3784 Address: netutils.ParseIPSloppy("1.2.3.4"),
3785 Protocol: string(v1.ProtocolSCTP),
3786 Port: 53,
3787 Scheduler: "rr",
3788 Flags: utilipvs.FlagHashed,
3789 },
3790 bindAddr: true,
3791 alreadyBoundAddrs: sets.New[string](),
3792 },
3793 {
3794
3795 oldVirtualServer: &utilipvs.VirtualServer{
3796 Address: netutils.ParseIPSloppy("1.2.3.4"),
3797 Protocol: string(v1.ProtocolSCTP),
3798 Port: 53,
3799 Scheduler: "rr",
3800 Flags: utilipvs.FlagHashed,
3801 },
3802 svcName: "baz",
3803 newVirtualServer: &utilipvs.VirtualServer{
3804 Address: netutils.ParseIPSloppy("1.2.3.4"),
3805 Protocol: string(v1.ProtocolSCTP),
3806 Port: 53,
3807 Scheduler: "rr",
3808 Flags: utilipvs.FlagHashed,
3809 },
3810 bindAddr: true,
3811 alreadyBoundAddrs: sets.New("1.2.3.4"),
3812 },
3813 }
3814
3815 for i := range testCases {
3816 ipt := iptablestest.NewFake()
3817 ipvs := ipvstest.NewFake()
3818 ipset := ipsettest.NewFake(testIPSetVersion)
3819 proxier := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
3820
3821 proxier.netlinkHandle.EnsureDummyDevice(defaultDummyDevice)
3822 if testCases[i].oldVirtualServer != nil {
3823 if err := proxier.ipvs.AddVirtualServer(testCases[i].oldVirtualServer); err != nil {
3824 t.Errorf("Case [%d], unexpected add IPVS virtual server error: %v", i, err)
3825 }
3826 }
3827 if err := proxier.syncService(testCases[i].svcName, testCases[i].newVirtualServer, testCases[i].bindAddr, testCases[i].alreadyBoundAddrs); err != nil {
3828 t.Errorf("Case [%d], unexpected sync IPVS virtual server error: %v", i, err)
3829 }
3830
3831 list, err := proxier.ipvs.GetVirtualServers()
3832 if err != nil {
3833 t.Errorf("Case [%d], unexpected list IPVS virtual server error: %v", i, err)
3834 }
3835 if len(list) != 1 {
3836 t.Errorf("Case [%d], expect %d virtual servers, got %d", i, 1, len(list))
3837 continue
3838 }
3839 if !list[0].Equal(testCases[i].newVirtualServer) {
3840 t.Errorf("Case [%d], unexpected mismatch, expect: %#v, got: %#v", i, testCases[i].newVirtualServer, list[0])
3841 }
3842 }
3843 }
3844
3845 func buildFakeProxier() (*iptablestest.FakeIPTables, *Proxier) {
3846 ipt := iptablestest.NewFake()
3847 ipvs := ipvstest.NewFake()
3848 ipset := ipsettest.NewFake(testIPSetVersion)
3849 return ipt, NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
3850 }
3851
3852 func getRules(ipt *iptablestest.FakeIPTables, chain utiliptables.Chain) []*iptablestest.Rule {
3853 var rules []*iptablestest.Rule
3854
3855 buf := bytes.NewBuffer(nil)
3856 _ = ipt.SaveInto(utiliptables.TableNAT, buf)
3857 _ = ipt.SaveInto(utiliptables.TableFilter, buf)
3858 lines := strings.Split(buf.String(), "\n")
3859 for _, l := range lines {
3860 if !strings.HasPrefix(l, "-A ") {
3861 continue
3862 }
3863 rule, _ := iptablestest.ParseRule(l, false)
3864 if rule != nil && rule.Chain == chain {
3865 rules = append(rules, rule)
3866 }
3867 }
3868 return rules
3869 }
3870
3871
3872
3873 func checkIptables(t *testing.T, ipt *iptablestest.FakeIPTables, epIpt netlinktest.ExpectedIptablesChain) {
3874 for epChain, epRules := range epIpt {
3875 rules := getRules(ipt, utiliptables.Chain(epChain))
3876 if len(rules) != len(epRules) {
3877 t.Errorf("Expected %d iptables rule in chain %s, got %d", len(epRules), epChain, len(rules))
3878 continue
3879 }
3880 for i, epRule := range epRules {
3881 rule := rules[i]
3882 if rule.Jump == nil || rule.Jump.Value != epRule.JumpChain {
3883 t.Errorf("Expected MatchSet=%s JumpChain=%s, got %s", epRule.MatchSet, epRule.JumpChain, rule.Raw)
3884 }
3885 if (epRule.MatchSet == "" && rule.MatchSet != nil) || (epRule.MatchSet != "" && (rule.MatchSet == nil || rule.MatchSet.Value != epRule.MatchSet)) {
3886 t.Errorf("Expected MatchSet=%s JumpChain=%s, got %s", epRule.MatchSet, epRule.JumpChain, rule.Raw)
3887 }
3888 }
3889 }
3890 }
3891
3892
3893 func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) {
3894 for set, entries := range ipSet {
3895 ents, err := fp.ipset.ListEntries(set)
3896 if err != nil || len(ents) != len(entries) {
3897 t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents))
3898 continue
3899 }
3900 expectedEntries := []string{}
3901 for _, entry := range entries {
3902 expectedEntries = append(expectedEntries, entry.String())
3903 }
3904 sort.Strings(ents)
3905 sort.Strings(expectedEntries)
3906 if !reflect.DeepEqual(ents, expectedEntries) {
3907 t.Errorf("Check ipset entries failed for ipset: %q", set)
3908 }
3909 }
3910 }
3911
3912
3913 func checkIPVS(t *testing.T, fp *Proxier, vs *netlinktest.ExpectedVirtualServer) {
3914 t.Helper()
3915 services, err := fp.ipvs.GetVirtualServers()
3916 if err != nil {
3917 t.Errorf("Failed to get ipvs services, err: %v", err)
3918 }
3919 if len(services) != vs.VSNum {
3920 t.Errorf("Expect %d ipvs services, got %d", vs.VSNum, len(services))
3921 }
3922 for _, svc := range services {
3923 if svc.Address.String() == vs.IP && svc.Port == vs.Port && svc.Protocol == vs.Protocol {
3924 destinations, _ := fp.ipvs.GetRealServers(svc)
3925 if len(destinations) != len(vs.RS) {
3926 t.Errorf("Expected %d destinations, got %d destinations", len(vs.RS), len(destinations))
3927 }
3928 if len(vs.RS) == 1 {
3929 if destinations[0].Address.String() != vs.RS[0].IP || destinations[0].Port != vs.RS[0].Port {
3930 t.Errorf("Unexpected mismatch destinations")
3931 }
3932 }
3933 }
3934 }
3935 }
3936
3937 func TestCleanLegacyService(t *testing.T) {
3938 ipt := iptablestest.NewFake()
3939 ipvs := ipvstest.NewFake()
3940 ipset := ipsettest.NewFake(testIPSetVersion)
3941 excludeCIDRs, _ := netutils.ParseCIDRs([]string{"3.3.3.0/24", "4.4.4.0/24"})
3942 fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol)
3943
3944
3945 activeServices := sets.New("ipvs0", "ipvs1")
3946
3947 currentServices := map[string]*utilipvs.VirtualServer{
3948
3949 "ipvs0": {
3950 Address: netutils.ParseIPSloppy("1.1.1.1"),
3951 Protocol: string(v1.ProtocolUDP),
3952 Port: 53,
3953 Scheduler: "rr",
3954 Flags: utilipvs.FlagHashed,
3955 },
3956
3957 "ipvs1": {
3958 Address: netutils.ParseIPSloppy("2.2.2.2"),
3959 Protocol: string(v1.ProtocolUDP),
3960 Port: 54,
3961 Scheduler: "rr",
3962 Flags: utilipvs.FlagHashed,
3963 },
3964
3965 "ipvs2": {
3966 Address: netutils.ParseIPSloppy("3.3.3.3"),
3967 Protocol: string(v1.ProtocolUDP),
3968 Port: 55,
3969 Scheduler: "rr",
3970 Flags: utilipvs.FlagHashed,
3971 },
3972
3973 "ipvs3": {
3974 Address: netutils.ParseIPSloppy("4.4.4.4"),
3975 Protocol: string(v1.ProtocolUDP),
3976 Port: 56,
3977 Scheduler: "rr",
3978 Flags: utilipvs.FlagHashed,
3979 },
3980
3981 "ipvs4": {
3982 Address: netutils.ParseIPSloppy("5.5.5.5"),
3983 Protocol: string(v1.ProtocolUDP),
3984 Port: 57,
3985 Scheduler: "rr",
3986 Flags: utilipvs.FlagHashed,
3987 },
3988
3989 "ipvs5": {
3990 Address: netutils.ParseIPSloppy("6.6.6.6"),
3991 Protocol: string(v1.ProtocolUDP),
3992 Port: 58,
3993 Scheduler: "rr",
3994 Flags: utilipvs.FlagHashed,
3995 },
3996 }
3997 for v := range currentServices {
3998 fp.ipvs.AddVirtualServer(currentServices[v])
3999 }
4000
4001 fp.cleanLegacyService(activeServices, currentServices)
4002
4003 remainingVirtualServers, _ := fp.ipvs.GetVirtualServers()
4004 if len(remainingVirtualServers) != 4 {
4005 t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 4, len(remainingVirtualServers))
4006 }
4007 for _, vs := range remainingVirtualServers {
4008
4009 if vs.Port == 57 {
4010 t.Errorf("Expected ipvs4 to be removed after cleanup. It still remains")
4011 }
4012 if vs.Port == 58 {
4013 t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains")
4014 }
4015 }
4016 }
4017
4018 func TestCleanLegacyServiceWithRealServers(t *testing.T) {
4019 ipt := iptablestest.NewFake()
4020 ipvs := ipvstest.NewFake()
4021 ipset := ipsettest.NewFake(testIPSetVersion)
4022 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
4023
4024
4025 activeServices := sets.New("ipvs2")
4026
4027 currentServices := map[string]*utilipvs.VirtualServer{
4028 "ipvs0": {
4029 Address: netutils.ParseIPSloppy("1.1.1.1"),
4030 Protocol: string(v1.ProtocolUDP),
4031 Port: 53,
4032 Scheduler: "rr",
4033 Flags: utilipvs.FlagHashed,
4034 },
4035 "ipvs1": {
4036 Address: netutils.ParseIPSloppy("2.2.2.2"),
4037 Protocol: string(v1.ProtocolUDP),
4038 Port: 54,
4039 Scheduler: "rr",
4040 Flags: utilipvs.FlagHashed,
4041 },
4042 "ipvs2": {
4043 Address: netutils.ParseIPSloppy("3.3.3.3"),
4044 Protocol: string(v1.ProtocolUDP),
4045 Port: 54,
4046 Scheduler: "rr",
4047 Flags: utilipvs.FlagHashed,
4048 },
4049 }
4050
4051
4052 realServers := map[*utilipvs.VirtualServer]*utilipvs.RealServer{
4053 {
4054 Address: netutils.ParseIPSloppy("1.1.1.1"),
4055 Protocol: string(v1.ProtocolUDP),
4056 Port: 53,
4057 Scheduler: "rr",
4058 Flags: utilipvs.FlagHashed,
4059 }: {
4060 Address: netutils.ParseIPSloppy("10.180.0.1"),
4061 Port: uint16(53),
4062 Weight: 1,
4063 },
4064 }
4065
4066 for v := range currentServices {
4067 fp.ipvs.AddVirtualServer(currentServices[v])
4068 }
4069
4070 for v, r := range realServers {
4071 fp.ipvs.AddRealServer(v, r)
4072 }
4073
4074 fp.cleanLegacyService(activeServices, currentServices)
4075 remainingVirtualServers, _ := fp.ipvs.GetVirtualServers()
4076 if len(remainingVirtualServers) != 1 {
4077 t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 1, len(remainingVirtualServers))
4078 }
4079
4080 if remainingVirtualServers[0] != currentServices["ipvs2"] {
4081 t.Logf("actual virtual server: %v", remainingVirtualServers[0])
4082 t.Logf("expected virtual server: %v", currentServices["ipvs0"])
4083 t.Errorf("unexpected IPVS service")
4084 }
4085 }
4086
4087 func TestCleanLegacyRealServersExcludeCIDRs(t *testing.T) {
4088 ipt := iptablestest.NewFake()
4089 ipvs := ipvstest.NewFake()
4090 ipset := ipsettest.NewFake(testIPSetVersion)
4091 gtm := NewGracefulTerminationManager(ipvs)
4092 excludeCIDRs, _ := netutils.ParseCIDRs([]string{"4.4.4.4/32"})
4093 fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol)
4094 fp.gracefuldeleteManager = gtm
4095
4096 vs := &utilipvs.VirtualServer{
4097 Address: netutils.ParseIPSloppy("4.4.4.4"),
4098 Protocol: string(v1.ProtocolUDP),
4099 Port: 56,
4100 Scheduler: "rr",
4101 Flags: utilipvs.FlagHashed,
4102 }
4103
4104 fp.ipvs.AddVirtualServer(vs)
4105
4106 rss := []*utilipvs.RealServer{
4107 {
4108 Address: netutils.ParseIPSloppy("10.10.10.10"),
4109 Port: 56,
4110 ActiveConn: 0,
4111 InactiveConn: 0,
4112 },
4113 {
4114 Address: netutils.ParseIPSloppy("11.11.11.11"),
4115 Port: 56,
4116 ActiveConn: 0,
4117 InactiveConn: 0,
4118 },
4119 }
4120 for _, rs := range rss {
4121 fp.ipvs.AddRealServer(vs, rs)
4122 }
4123
4124 fp.netlinkHandle.EnsureDummyDevice(defaultDummyDevice)
4125
4126 fp.netlinkHandle.EnsureAddressBind("4.4.4.4", defaultDummyDevice)
4127
4128 fp.cleanLegacyService(nil, map[string]*utilipvs.VirtualServer{"ipvs0": vs})
4129
4130 fp.gracefuldeleteManager.tryDeleteRs()
4131
4132 remainingRealServers, _ := fp.ipvs.GetRealServers(vs)
4133
4134 if len(remainingRealServers) != 2 {
4135 t.Errorf("Expected number of remaining IPVS real servers after cleanup should be %v. Got %v", 2, len(remainingRealServers))
4136 }
4137 }
4138
4139 func TestCleanLegacyService6(t *testing.T) {
4140 ipt := iptablestest.NewFake()
4141 ipvs := ipvstest.NewFake()
4142 ipset := ipsettest.NewFake(testIPSetVersion)
4143 excludeCIDRs, _ := netutils.ParseCIDRs([]string{"3000::/64", "4000::/64"})
4144 fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv6Protocol)
4145 fp.nodeIP = netutils.ParseIPSloppy("::1")
4146
4147
4148 activeServices := sets.New("ipvs0", "ipvs1")
4149
4150 currentServices := map[string]*utilipvs.VirtualServer{
4151
4152 "ipvs0": {
4153 Address: netutils.ParseIPSloppy("1000::1"),
4154 Protocol: string(v1.ProtocolUDP),
4155 Port: 53,
4156 Scheduler: "rr",
4157 Flags: utilipvs.FlagHashed,
4158 },
4159
4160 "ipvs1": {
4161 Address: netutils.ParseIPSloppy("1000::2"),
4162 Protocol: string(v1.ProtocolUDP),
4163 Port: 54,
4164 Scheduler: "rr",
4165 Flags: utilipvs.FlagHashed,
4166 },
4167
4168 "ipvs2": {
4169 Address: netutils.ParseIPSloppy("3000::1"),
4170 Protocol: string(v1.ProtocolUDP),
4171 Port: 55,
4172 Scheduler: "rr",
4173 Flags: utilipvs.FlagHashed,
4174 },
4175
4176 "ipvs3": {
4177 Address: netutils.ParseIPSloppy("4000::1"),
4178 Protocol: string(v1.ProtocolUDP),
4179 Port: 56,
4180 Scheduler: "rr",
4181 Flags: utilipvs.FlagHashed,
4182 },
4183
4184 "ipvs4": {
4185 Address: netutils.ParseIPSloppy("5000::1"),
4186 Protocol: string(v1.ProtocolUDP),
4187 Port: 57,
4188 Scheduler: "rr",
4189 Flags: utilipvs.FlagHashed,
4190 },
4191
4192 "ipvs5": {
4193 Address: netutils.ParseIPSloppy("1000::6"),
4194 Protocol: string(v1.ProtocolUDP),
4195 Port: 58,
4196 Scheduler: "rr",
4197 Flags: utilipvs.FlagHashed,
4198 },
4199 }
4200 for v := range currentServices {
4201 fp.ipvs.AddVirtualServer(currentServices[v])
4202 }
4203
4204 fp.cleanLegacyService(activeServices, currentServices)
4205
4206 remainingVirtualServers, _ := fp.ipvs.GetVirtualServers()
4207 if len(remainingVirtualServers) != 4 {
4208 t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 4, len(remainingVirtualServers))
4209 }
4210 for _, vs := range remainingVirtualServers {
4211
4212 if vs.Port == 57 {
4213 t.Errorf("Expected ipvs4 to be removed after cleanup. It still remains")
4214 }
4215 if vs.Port == 58 {
4216 t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains")
4217 }
4218 }
4219 }
4220
4221 func TestMultiPortServiceBindAddr(t *testing.T) {
4222 ipt := iptablestest.NewFake()
4223 ipvs := ipvstest.NewFake()
4224 ipset := ipsettest.NewFake(testIPSetVersion)
4225 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
4226
4227 service1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
4228 svc.Spec.Type = v1.ServiceTypeClusterIP
4229 svc.Spec.ClusterIP = "172.16.55.4"
4230 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "TCP", 1234, 0, 0)
4231 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 1235, 0, 0)
4232 })
4233 service2 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
4234 svc.Spec.Type = v1.ServiceTypeClusterIP
4235 svc.Spec.ClusterIP = "172.16.55.4"
4236 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "TCP", 1234, 0, 0)
4237 })
4238 service3 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
4239 svc.Spec.Type = v1.ServiceTypeClusterIP
4240 svc.Spec.ClusterIP = "172.16.55.4"
4241 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "TCP", 1234, 0, 0)
4242 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 1235, 0, 0)
4243 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port3", "UDP", 1236, 0, 0)
4244 })
4245
4246 fp.servicesSynced = true
4247
4248
4249 fp.OnServiceAdd(service1)
4250 fp.syncProxyRules()
4251 remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(defaultDummyDevice)
4252
4253 if len(remainingAddrs) != 1 {
4254 t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs))
4255 }
4256 if remainingAddrs[0] != "172.16.55.4" {
4257 t.Errorf("Expected remaining address should be %s, got %s", "172.16.55.4", remainingAddrs[0])
4258 }
4259
4260
4261 fp.OnServiceUpdate(service1, service2)
4262 fp.syncProxyRules()
4263 remainingAddrs, _ = fp.netlinkHandle.ListBindAddress(defaultDummyDevice)
4264
4265 if len(remainingAddrs) != 1 {
4266 t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs))
4267 } else if remainingAddrs[0] != "172.16.55.4" {
4268 t.Errorf("Expected remaining address should be %s, got %s", "172.16.55.4", remainingAddrs[0])
4269 }
4270
4271
4272 fp.OnServiceUpdate(service2, service3)
4273 fp.syncProxyRules()
4274 remainingAddrs, _ = fp.netlinkHandle.ListBindAddress(defaultDummyDevice)
4275
4276 if len(remainingAddrs) != 1 {
4277 t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs))
4278 } else if remainingAddrs[0] != "172.16.55.4" {
4279 t.Errorf("Expected remaining address should be %s, got %s", "172.16.55.4", remainingAddrs[0])
4280 }
4281
4282
4283 fp.OnServiceDelete(service3)
4284 fp.syncProxyRules()
4285 remainingAddrs, _ = fp.netlinkHandle.ListBindAddress(defaultDummyDevice)
4286
4287 if len(remainingAddrs) != 0 {
4288 t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 0, len(remainingAddrs))
4289 }
4290 }
4291
4292 func Test_getFirstColumn(t *testing.T) {
4293 testCases := []struct {
4294 name string
4295 fileContent string
4296 want []string
4297 wantErr bool
4298 }{
4299 {
4300 name: "valid content",
4301 fileContent: `libiscsi_tcp 28672 1 iscsi_tcp, Live 0xffffffffc07ae000
4302 libiscsi 57344 3 ib_iser,iscsi_tcp,libiscsi_tcp, Live 0xffffffffc079a000
4303 raid10 57344 0 - Live 0xffffffffc0597000`,
4304 want: []string{"libiscsi_tcp", "libiscsi", "raid10"},
4305 wantErr: false,
4306 },
4307 }
4308 for _, test := range testCases {
4309 t.Run(test.name, func(t *testing.T) {
4310 got, err := getFirstColumn(strings.NewReader(test.fileContent))
4311 if (err != nil) != test.wantErr {
4312 t.Errorf("getFirstColumn() error = %v, wantErr %v", err, test.wantErr)
4313 return
4314 }
4315 if !reflect.DeepEqual(got, test.want) {
4316 t.Errorf("getFirstColumn() = %v, want %v", got, test.want)
4317 }
4318 })
4319 }
4320 }
4321
4322
4323
4324
4325 func TestEndpointSliceE2E(t *testing.T) {
4326 ipt := iptablestest.NewFake()
4327 ipvs := ipvstest.NewFake()
4328 ipset := ipsettest.NewFake(testIPSetVersion)
4329 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
4330 fp.servicesSynced = true
4331 fp.endpointSlicesSynced = true
4332
4333
4334 serviceName := "svc1"
4335 namespaceName := "ns1"
4336 fp.OnServiceAdd(&v1.Service{
4337 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
4338 Spec: v1.ServiceSpec{
4339 ClusterIP: "172.20.1.1",
4340 Selector: map[string]string{"foo": "bar"},
4341 Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP}},
4342 },
4343 })
4344
4345
4346 endpointSlice := &discovery.EndpointSlice{
4347 ObjectMeta: metav1.ObjectMeta{
4348 Name: fmt.Sprintf("%s-1", serviceName),
4349 Namespace: namespaceName,
4350 Labels: map[string]string{discovery.LabelServiceName: serviceName},
4351 },
4352 Ports: []discovery.EndpointPort{{
4353 Name: ptr.To(""),
4354 Port: ptr.To[int32](80),
4355 Protocol: ptr.To(v1.ProtocolTCP),
4356 }},
4357 AddressType: discovery.AddressTypeIPv4,
4358 Endpoints: []discovery.Endpoint{{
4359 Addresses: []string{"10.0.1.1"},
4360 Conditions: discovery.EndpointConditions{Ready: ptr.To(true)},
4361 NodeName: ptr.To(testHostname),
4362 }, {
4363 Addresses: []string{"10.0.1.2"},
4364 Conditions: discovery.EndpointConditions{Ready: ptr.To(true)},
4365 NodeName: ptr.To("node2"),
4366 }, {
4367 Addresses: []string{"10.0.1.3"},
4368 Conditions: discovery.EndpointConditions{Ready: ptr.To(true)},
4369 NodeName: ptr.To("node3"),
4370 }, {
4371 Addresses: []string{"10.0.1.4"},
4372 Conditions: discovery.EndpointConditions{Ready: ptr.To(false)},
4373 NodeName: ptr.To("node3"),
4374 }},
4375 }
4376
4377 fp.OnEndpointSliceAdd(endpointSlice)
4378 fp.syncProxyRules()
4379
4380
4381 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
4382 activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
4383 assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-LOOP-BACK")
4384 assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod")
4385 virtualServers1, vsErr1 := ipvs.GetVirtualServers()
4386 assert.Nil(t, vsErr1, "Expected no error getting virtual servers")
4387 assert.Len(t, virtualServers1, 1, "Expected 1 virtual server")
4388 realServers1, rsErr1 := ipvs.GetRealServers(virtualServers1[0])
4389 assert.Nil(t, rsErr1, "Expected no error getting real servers")
4390 assert.Len(t, realServers1, 3, "Expected 3 real servers")
4391 assert.Equal(t, realServers1[0].String(), "10.0.1.1:80")
4392 assert.Equal(t, realServers1[1].String(), "10.0.1.2:80")
4393 assert.Equal(t, realServers1[2].String(), "10.0.1.3:80")
4394
4395 fp.OnEndpointSliceDelete(endpointSlice)
4396 fp.syncProxyRules()
4397
4398
4399 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
4400 activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
4401 assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
4402 virtualServers2, vsErr2 := ipvs.GetVirtualServers()
4403 assert.Nil(t, vsErr2, "Expected no error getting virtual servers")
4404 assert.Len(t, virtualServers2, 1, "Expected 1 virtual server")
4405 realServers2, rsErr2 := ipvs.GetRealServers(virtualServers2[0])
4406 assert.Nil(t, rsErr2, "Expected no error getting real servers")
4407 assert.Len(t, realServers2, 0, "Expected 0 real servers")
4408 }
4409
4410 func TestHealthCheckNodePortE2E(t *testing.T) {
4411 ipt := iptablestest.NewFake()
4412 ipvs := ipvstest.NewFake()
4413 ipset := ipsettest.NewFake(testIPSetVersion)
4414 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
4415 fp.servicesSynced = true
4416 fp.endpointSlicesSynced = true
4417
4418
4419 serviceName := "svc1"
4420 namespaceName := "ns1"
4421
4422 svc := v1.Service{
4423 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
4424 Spec: v1.ServiceSpec{
4425 ClusterIP: "172.20.1.1",
4426 Selector: map[string]string{"foo": "bar"},
4427 Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP}},
4428 Type: "LoadBalancer",
4429 HealthCheckNodePort: 30000,
4430 ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal,
4431 },
4432 }
4433 fp.OnServiceAdd(&svc)
4434 fp.syncProxyRules()
4435
4436
4437 assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"])
4438 activeEntries1 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries
4439 assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-HEALTH-CHECK-NODE-PORT")
4440 assert.Equal(t, true, activeEntries1.Has("30000"), "Expected activeEntries to reference hc node port in spec")
4441
4442
4443 newSvc := svc
4444 newSvc.Spec.HealthCheckNodePort = 30001
4445 fp.OnServiceUpdate(&svc, &newSvc)
4446 fp.syncProxyRules()
4447
4448
4449 assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"])
4450 activeEntries2 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries
4451 assert.Equal(t, 1, activeEntries2.Len(), "Expected 1 active entry in KUBE-HEALTH-CHECK-NODE-PORT")
4452 assert.Equal(t, true, activeEntries2.Has("30001"), "Expected activeEntries to reference updated hc node port in spec")
4453
4454 fp.OnServiceDelete(&svc)
4455 fp.syncProxyRules()
4456
4457
4458 assert.NotNil(t, fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"])
4459 activeEntries3 := fp.ipsetList["KUBE-HEALTH-CHECK-NODE-PORT"].activeEntries
4460 assert.Equal(t, 0, activeEntries3.Len(), "Expected 0 active entries in KUBE-HEALTH-CHECK-NODE-PORT")
4461 }
4462
4463
4464 func Test_HealthCheckNodePortWhenTerminating(t *testing.T) {
4465 ipt := iptablestest.NewFake()
4466 ipvs := ipvstest.NewFake()
4467 ipset := ipsettest.NewFake(testIPSetVersion)
4468 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
4469 fp.servicesSynced = true
4470
4471 fp.endpointSlicesSynced = true
4472
4473 serviceName := "svc1"
4474 namespaceName := "ns1"
4475
4476 fp.OnServiceAdd(&v1.Service{
4477 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
4478 Spec: v1.ServiceSpec{
4479 ClusterIP: "172.20.1.1",
4480 Selector: map[string]string{"foo": "bar"},
4481 Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP}},
4482 },
4483 })
4484
4485 endpointSlice := &discovery.EndpointSlice{
4486 ObjectMeta: metav1.ObjectMeta{
4487 Name: fmt.Sprintf("%s-1", serviceName),
4488 Namespace: namespaceName,
4489 Labels: map[string]string{discovery.LabelServiceName: serviceName},
4490 },
4491 Ports: []discovery.EndpointPort{{
4492 Name: ptr.To(""),
4493 Port: ptr.To[int32](80),
4494 Protocol: ptr.To(v1.ProtocolTCP),
4495 }},
4496 AddressType: discovery.AddressTypeIPv4,
4497 Endpoints: []discovery.Endpoint{{
4498 Addresses: []string{"10.0.1.1"},
4499 Conditions: discovery.EndpointConditions{Ready: ptr.To(true)},
4500 NodeName: ptr.To(testHostname),
4501 }, {
4502 Addresses: []string{"10.0.1.2"},
4503 Conditions: discovery.EndpointConditions{Ready: ptr.To(true)},
4504 NodeName: ptr.To(testHostname),
4505 }, {
4506 Addresses: []string{"10.0.1.3"},
4507 Conditions: discovery.EndpointConditions{Ready: ptr.To(true)},
4508 }, {
4509 Addresses: []string{"10.0.1.4"},
4510 Conditions: discovery.EndpointConditions{Ready: ptr.To(false)},
4511 NodeName: ptr.To(testHostname),
4512 }},
4513 }
4514
4515 fp.OnEndpointSliceAdd(endpointSlice)
4516 _ = fp.endpointsMap.Update(fp.endpointsChanges)
4517 localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
4518 if len(localReadyEndpoints) != 1 {
4519 t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(localReadyEndpoints))
4520 }
4521
4522
4523 endpointSliceTerminating := &discovery.EndpointSlice{
4524 ObjectMeta: metav1.ObjectMeta{
4525 Name: fmt.Sprintf("%s-1", serviceName),
4526 Namespace: namespaceName,
4527 Labels: map[string]string{discovery.LabelServiceName: serviceName},
4528 },
4529 Ports: []discovery.EndpointPort{{
4530 Name: ptr.To(""),
4531 Port: ptr.To[int32](80),
4532 Protocol: ptr.To(v1.ProtocolTCP),
4533 }},
4534 AddressType: discovery.AddressTypeIPv4,
4535 Endpoints: []discovery.Endpoint{{
4536 Addresses: []string{"10.0.1.1"},
4537 Conditions: discovery.EndpointConditions{
4538 Ready: ptr.To(false),
4539 Serving: ptr.To(true),
4540 Terminating: ptr.To(false),
4541 },
4542 NodeName: ptr.To(testHostname),
4543 }, {
4544 Addresses: []string{"10.0.1.2"},
4545 Conditions: discovery.EndpointConditions{
4546 Ready: ptr.To(false),
4547 Serving: ptr.To(true),
4548 Terminating: ptr.To(true),
4549 },
4550 NodeName: ptr.To(testHostname),
4551 }, {
4552 Addresses: []string{"10.0.1.3"},
4553 Conditions: discovery.EndpointConditions{
4554 Ready: ptr.To(false),
4555 Serving: ptr.To(true),
4556 Terminating: ptr.To(true),
4557 },
4558 NodeName: ptr.To(testHostname),
4559 }, {
4560 Addresses: []string{"10.0.1.4"},
4561 Conditions: discovery.EndpointConditions{
4562 Ready: ptr.To(false),
4563 Serving: ptr.To(false),
4564 Terminating: ptr.To(true),
4565 },
4566 NodeName: ptr.To(testHostname),
4567 }},
4568 }
4569
4570 fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating)
4571 _ = fp.endpointsMap.Update(fp.endpointsChanges)
4572 localReadyEndpoints = fp.endpointsMap.LocalReadyEndpoints()
4573 if len(localReadyEndpoints) != 0 {
4574 t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(localReadyEndpoints))
4575 }
4576 }
4577
4578 func TestFilterCIDRs(t *testing.T) {
4579 var cidrList []string
4580 var cidrs []string
4581 var expected []string
4582 cidrs = filterCIDRs(true, []string{})
4583 if len(cidrs) > 0 {
4584 t.Errorf("An empty list produces a non-empty return %v", cidrs)
4585 }
4586
4587 cidrList = []string{"1000::/64", "10.0.0.0/16", "11.0.0.0/16", "2000::/64"}
4588 expected = []string{"1000::/64", "2000::/64"}
4589 cidrs = filterCIDRs(true, cidrList)
4590 if !reflect.DeepEqual(cidrs, expected) {
4591 t.Errorf("cidrs %v is not expected %v", cidrs, expected)
4592 }
4593
4594 expected = []string{"10.0.0.0/16", "11.0.0.0/16"}
4595 cidrs = filterCIDRs(false, cidrList)
4596 if !reflect.DeepEqual(cidrs, expected) {
4597 t.Errorf("cidrs %v is not expected %v", cidrs, expected)
4598 }
4599
4600 cidrList = []string{"1000::/64", "2000::/64"}
4601 expected = []string{}
4602 cidrs = filterCIDRs(false, cidrList)
4603 if len(cidrs) > 0 {
4604 t.Errorf("cidrs %v is not expected %v", cidrs, expected)
4605 }
4606 }
4607
4608 func TestCreateAndLinkKubeChain(t *testing.T) {
4609 ipt := iptablestest.NewFake()
4610 ipvs := ipvstest.NewFake()
4611 ipset := ipsettest.NewFake(testIPSetVersion)
4612 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
4613 fp.createAndLinkKubeChain()
4614 expectedNATChains := `:KUBE-SERVICES - [0:0]
4615 :KUBE-POSTROUTING - [0:0]
4616 :KUBE-NODE-PORT - [0:0]
4617 :KUBE-LOAD-BALANCER - [0:0]
4618 :KUBE-MARK-MASQ - [0:0]
4619 `
4620 expectedFilterChains := `:KUBE-FORWARD - [0:0]
4621 :KUBE-NODE-PORT - [0:0]
4622 :KUBE-PROXY-FIREWALL - [0:0]
4623 :KUBE-SOURCE-RANGES-FIREWALL - [0:0]
4624 :KUBE-IPVS-FILTER - [0:0]
4625 :KUBE-IPVS-OUT-FILTER - [0:0]
4626 `
4627 assert.Equal(t, expectedNATChains, fp.natChains.String())
4628 assert.Equal(t, expectedFilterChains, fp.filterChains.String())
4629 }
4630
4631
4632
4633 func TestTestInternalTrafficPolicyE2E(t *testing.T) {
4634 type endpoint struct {
4635 ip string
4636 hostname string
4637 }
4638
4639 testCases := []struct {
4640 name string
4641 internalTrafficPolicy *v1.ServiceInternalTrafficPolicy
4642 endpoints []endpoint
4643 expectVirtualServer bool
4644 expectLocalEntries bool
4645 expectLocalRealServerNum int
4646 expectLocalRealServers []string
4647 }{
4648 {
4649 name: "internalTrafficPolicy is cluster with non-zero local endpoints",
4650 internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster),
4651 endpoints: []endpoint{
4652 {"10.0.1.1", testHostname},
4653 {"10.0.1.2", "host1"},
4654 {"10.0.1.3", "host2"},
4655 },
4656 expectVirtualServer: true,
4657 expectLocalEntries: true,
4658 expectLocalRealServerNum: 3,
4659 expectLocalRealServers: []string{
4660 "10.0.1.1:80",
4661 "10.0.1.2:80",
4662 "10.0.1.3:80",
4663 },
4664 },
4665 {
4666 name: "internalTrafficPolicy is cluster with zero local endpoints",
4667 internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster),
4668 endpoints: []endpoint{
4669 {"10.0.1.1", "host0"},
4670 {"10.0.1.2", "host1"},
4671 {"10.0.1.3", "host2"},
4672 },
4673 expectVirtualServer: false,
4674 expectLocalEntries: false,
4675 expectLocalRealServerNum: 3,
4676 expectLocalRealServers: []string{
4677 "10.0.1.1:80",
4678 "10.0.1.2:80",
4679 "10.0.1.3:80",
4680 },
4681 },
4682 {
4683 name: "internalTrafficPolicy is local with non-zero local endpoints",
4684 internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal),
4685 endpoints: []endpoint{
4686 {"10.0.1.1", testHostname},
4687 {"10.0.1.2", "host1"},
4688 {"10.0.1.3", "host2"},
4689 },
4690 expectVirtualServer: true,
4691 expectLocalEntries: true,
4692 expectLocalRealServerNum: 1,
4693 expectLocalRealServers: []string{
4694 "10.0.1.1:80",
4695 },
4696 },
4697 {
4698 name: "internalTrafficPolicy is local with zero local endpoints",
4699 internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal),
4700 endpoints: []endpoint{
4701 {"10.0.1.1", "host0"},
4702 {"10.0.1.2", "host1"},
4703 {"10.0.1.3", "host2"},
4704 },
4705 expectVirtualServer: false,
4706 expectLocalEntries: false,
4707 expectLocalRealServerNum: 0,
4708 expectLocalRealServers: []string{},
4709 },
4710 }
4711 for _, tc := range testCases {
4712 ipt := iptablestest.NewFake()
4713 ipvs := ipvstest.NewFake()
4714 ipset := ipsettest.NewFake(testIPSetVersion)
4715 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
4716 fp.servicesSynced = true
4717
4718 fp.endpointSlicesSynced = true
4719
4720
4721 serviceName := "svc1"
4722 namespaceName := "ns1"
4723
4724 svc := &v1.Service{
4725 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
4726 Spec: v1.ServiceSpec{
4727 ClusterIP: "172.20.1.1",
4728 Selector: map[string]string{"foo": "bar"},
4729 Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP}},
4730 },
4731 }
4732 if tc.internalTrafficPolicy != nil {
4733 svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy
4734 }
4735
4736 fp.OnServiceAdd(svc)
4737
4738
4739 endpointSlice := &discovery.EndpointSlice{
4740 ObjectMeta: metav1.ObjectMeta{
4741 Name: fmt.Sprintf("%s-1", serviceName),
4742 Namespace: namespaceName,
4743 Labels: map[string]string{discovery.LabelServiceName: serviceName},
4744 },
4745 Ports: []discovery.EndpointPort{{
4746 Name: ptr.To(""),
4747 Port: ptr.To[int32](80),
4748 Protocol: ptr.To(v1.ProtocolTCP),
4749 }},
4750 AddressType: discovery.AddressTypeIPv4,
4751 }
4752
4753 for _, ep := range tc.endpoints {
4754 endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{
4755 Addresses: []string{ep.ip},
4756 Conditions: discovery.EndpointConditions{Ready: ptr.To(true)},
4757 NodeName: ptr.To(ep.hostname),
4758 })
4759 }
4760
4761 fp.OnEndpointSliceAdd(endpointSlice)
4762 fp.syncProxyRules()
4763
4764
4765 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
4766
4767 activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
4768
4769 if tc.expectLocalEntries {
4770 assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-LOOP-BACK")
4771 } else {
4772 assert.Equal(t, 0, activeEntries1.Len(), "Expected no active entry in KUBE-LOOP-BACK")
4773 }
4774
4775 if tc.expectVirtualServer {
4776 virtualServers1, vsErr1 := ipvs.GetVirtualServers()
4777 assert.Nil(t, vsErr1, "Expected no error getting virtual servers")
4778
4779 assert.Len(t, virtualServers1, 1, "Expected 1 virtual server")
4780 realServers1, rsErr1 := ipvs.GetRealServers(virtualServers1[0])
4781 assert.Nil(t, rsErr1, "Expected no error getting real servers")
4782
4783 assert.Len(t, realServers1, tc.expectLocalRealServerNum, fmt.Sprintf("Expected %d real servers", tc.expectLocalRealServerNum))
4784 for i := 0; i < tc.expectLocalRealServerNum; i++ {
4785 assert.Equal(t, realServers1[i].String(), tc.expectLocalRealServers[i])
4786 }
4787 }
4788
4789 fp.OnEndpointSliceDelete(endpointSlice)
4790 fp.syncProxyRules()
4791
4792
4793 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
4794 activeEntries3 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
4795 assert.Equal(t, 0, activeEntries3.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
4796 virtualServers2, vsErr2 := ipvs.GetVirtualServers()
4797 assert.Nil(t, vsErr2, "Expected no error getting virtual servers")
4798 assert.Len(t, virtualServers2, 1, "Expected 1 virtual server")
4799 realServers2, rsErr2 := ipvs.GetRealServers(virtualServers2[0])
4800 assert.Nil(t, rsErr2, "Expected no error getting real servers")
4801 assert.Len(t, realServers2, 0, "Expected 0 real servers")
4802 }
4803 }
4804
4805
4806
4807 func Test_EndpointSliceReadyAndTerminatingCluster(t *testing.T) {
4808
4809 ipt := iptablestest.NewFake()
4810 ipvs := ipvstest.NewFake()
4811 ipset := ipsettest.NewFake(testIPSetVersion)
4812 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
4813 fp.servicesSynced = true
4814
4815 fp.endpointSlicesSynced = true
4816
4817 serviceName := "svc1"
4818
4819 namespaceName := "ns1"
4820 fp.OnServiceAdd(&v1.Service{
4821 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
4822 Spec: v1.ServiceSpec{
4823 ClusterIP: "172.20.1.1",
4824 Selector: map[string]string{"foo": "bar"},
4825 Type: v1.ServiceTypeNodePort,
4826 ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyCluster,
4827 InternalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster),
4828 ExternalIPs: []string{
4829 "1.2.3.4",
4830 },
4831 Ports: []v1.ServicePort{
4832 {
4833 Name: "",
4834 Port: 80,
4835 TargetPort: intstr.FromInt32(80),
4836 Protocol: v1.ProtocolTCP,
4837 },
4838 },
4839 },
4840 })
4841
4842
4843 endpointSlice := &discovery.EndpointSlice{
4844 ObjectMeta: metav1.ObjectMeta{
4845 Name: fmt.Sprintf("%s-1", serviceName),
4846 Namespace: namespaceName,
4847 Labels: map[string]string{discovery.LabelServiceName: serviceName},
4848 },
4849 Ports: []discovery.EndpointPort{{
4850 Name: ptr.To(""),
4851 Port: ptr.To[int32](80),
4852 Protocol: ptr.To(v1.ProtocolTCP),
4853 }},
4854 AddressType: discovery.AddressTypeIPv4,
4855 Endpoints: []discovery.Endpoint{
4856 {
4857 Addresses: []string{"10.0.1.1"},
4858 Conditions: discovery.EndpointConditions{
4859 Ready: ptr.To(true),
4860 Serving: ptr.To(true),
4861 Terminating: ptr.To(false),
4862 },
4863 NodeName: ptr.To(testHostname),
4864 },
4865 {
4866 Addresses: []string{"10.0.1.2"},
4867 Conditions: discovery.EndpointConditions{
4868 Ready: ptr.To(true),
4869 Serving: ptr.To(true),
4870 Terminating: ptr.To(false),
4871 },
4872 NodeName: ptr.To(testHostname),
4873 },
4874 {
4875 Addresses: []string{"10.0.1.3"},
4876 Conditions: discovery.EndpointConditions{
4877 Ready: ptr.To(false),
4878 Serving: ptr.To(true),
4879 Terminating: ptr.To(true),
4880 },
4881 NodeName: ptr.To(testHostname),
4882 },
4883 {
4884 Addresses: []string{"10.0.1.4"},
4885 Conditions: discovery.EndpointConditions{
4886 Ready: ptr.To(false),
4887 Serving: ptr.To(false),
4888 Terminating: ptr.To(true),
4889 },
4890 NodeName: ptr.To(testHostname),
4891 },
4892 {
4893 Addresses: []string{"10.0.1.5"},
4894 Conditions: discovery.EndpointConditions{
4895 Ready: ptr.To(true),
4896 Serving: ptr.To(true),
4897 Terminating: ptr.To(false),
4898 },
4899 NodeName: ptr.To("another-host"),
4900 },
4901 },
4902 }
4903
4904 fp.OnEndpointSliceAdd(endpointSlice)
4905 fp.syncProxyRules()
4906
4907
4908 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
4909 activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
4910 assert.Equal(t, 4, activeEntries1.Len(), "Expected 4 active entry in KUBE-LOOP-BACK")
4911 assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first pod")
4912 assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second pod")
4913 assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference third pod")
4914 assert.Equal(t, true, activeEntries1.Has("10.0.1.4,tcp:80,10.0.1.4"), "Expected activeEntries to reference fourth pod")
4915
4916 virtualServers, vsErr := ipvs.GetVirtualServers()
4917 assert.Nil(t, vsErr, "Expected no error getting virtual servers")
4918 assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
4919
4920 var clusterIPServer, externalIPServer *utilipvs.VirtualServer
4921 for _, virtualServer := range virtualServers {
4922 if virtualServer.Address.String() == "172.20.1.1" {
4923 clusterIPServer = virtualServer
4924 }
4925
4926 if virtualServer.Address.String() == "1.2.3.4" {
4927 externalIPServer = virtualServer
4928 }
4929 }
4930
4931
4932 realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer)
4933 assert.Nil(t, rsErr1, "Expected no error getting real servers")
4934 assert.Len(t, realServers1, 3, "Expected 3 real servers")
4935 assert.Equal(t, realServers1[0].String(), "10.0.1.1:80")
4936 assert.Equal(t, realServers1[1].String(), "10.0.1.2:80")
4937 assert.Equal(t, realServers1[2].String(), "10.0.1.5:80")
4938
4939
4940 realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer)
4941 assert.Nil(t, rsErr2, "Expected no error getting real servers")
4942 assert.Len(t, realServers2, 3, "Expected 3 real servers")
4943 assert.Equal(t, realServers2[0].String(), "10.0.1.1:80")
4944 assert.Equal(t, realServers2[1].String(), "10.0.1.2:80")
4945 assert.Equal(t, realServers1[2].String(), "10.0.1.5:80")
4946
4947 fp.OnEndpointSliceDelete(endpointSlice)
4948 fp.syncProxyRules()
4949
4950
4951 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
4952 activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
4953 assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
4954
4955 virtualServers, vsErr = ipvs.GetVirtualServers()
4956 assert.Nil(t, vsErr, "Expected no error getting virtual servers")
4957 assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
4958
4959 for _, virtualServer := range virtualServers {
4960 if virtualServer.Address.String() == "172.20.1.1" {
4961 clusterIPServer = virtualServer
4962 }
4963
4964 if virtualServer.Address.String() == "1.2.3.4" {
4965 externalIPServer = virtualServer
4966 }
4967 }
4968
4969 realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer)
4970 assert.Nil(t, rsErr1, "Expected no error getting real servers")
4971 assert.Len(t, realServers1, 0, "Expected 0 real servers")
4972
4973 realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer)
4974 assert.Nil(t, rsErr2, "Expected no error getting real servers")
4975 assert.Len(t, realServers2, 0, "Expected 0 real servers")
4976 }
4977
4978
4979
4980 func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) {
4981
4982 ipt := iptablestest.NewFake()
4983 ipvs := ipvstest.NewFake()
4984 ipset := ipsettest.NewFake(testIPSetVersion)
4985 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
4986 fp.servicesSynced = true
4987
4988 fp.endpointSlicesSynced = true
4989
4990 serviceName := "svc1"
4991
4992 namespaceName := "ns1"
4993 fp.OnServiceAdd(&v1.Service{
4994 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
4995 Spec: v1.ServiceSpec{
4996 ClusterIP: "172.20.1.1",
4997 Selector: map[string]string{"foo": "bar"},
4998 Type: v1.ServiceTypeNodePort,
4999 ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal,
5000 InternalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster),
5001 ExternalIPs: []string{
5002 "1.2.3.4",
5003 },
5004 Ports: []v1.ServicePort{
5005 {
5006 Name: "",
5007 Port: 80,
5008 TargetPort: intstr.FromInt32(80),
5009 Protocol: v1.ProtocolTCP,
5010 },
5011 },
5012 },
5013 })
5014
5015
5016 endpointSlice := &discovery.EndpointSlice{
5017 ObjectMeta: metav1.ObjectMeta{
5018 Name: fmt.Sprintf("%s-1", serviceName),
5019 Namespace: namespaceName,
5020 Labels: map[string]string{discovery.LabelServiceName: serviceName},
5021 },
5022 Ports: []discovery.EndpointPort{{
5023 Name: ptr.To(""),
5024 Port: ptr.To[int32](80),
5025 Protocol: ptr.To(v1.ProtocolTCP),
5026 }},
5027 AddressType: discovery.AddressTypeIPv4,
5028 Endpoints: []discovery.Endpoint{
5029 {
5030 Addresses: []string{"10.0.1.1"},
5031 Conditions: discovery.EndpointConditions{
5032 Ready: ptr.To(true),
5033 Serving: ptr.To(true),
5034 Terminating: ptr.To(false),
5035 },
5036 NodeName: ptr.To(testHostname),
5037 },
5038 {
5039 Addresses: []string{"10.0.1.2"},
5040 Conditions: discovery.EndpointConditions{
5041 Ready: ptr.To(true),
5042 Serving: ptr.To(true),
5043 Terminating: ptr.To(false),
5044 },
5045 NodeName: ptr.To(testHostname),
5046 },
5047 {
5048 Addresses: []string{"10.0.1.3"},
5049 Conditions: discovery.EndpointConditions{
5050 Ready: ptr.To(false),
5051 Serving: ptr.To(true),
5052 Terminating: ptr.To(true),
5053 },
5054 NodeName: ptr.To(testHostname),
5055 },
5056 {
5057 Addresses: []string{"10.0.1.4"},
5058 Conditions: discovery.EndpointConditions{
5059 Ready: ptr.To(false),
5060 Serving: ptr.To(false),
5061 Terminating: ptr.To(true),
5062 },
5063 NodeName: ptr.To(testHostname),
5064 },
5065 {
5066 Addresses: []string{"10.0.1.5"},
5067 Conditions: discovery.EndpointConditions{
5068 Ready: ptr.To(true),
5069 Serving: ptr.To(true),
5070 Terminating: ptr.To(false),
5071 },
5072 NodeName: ptr.To("another-host"),
5073 },
5074 },
5075 }
5076
5077 fp.OnEndpointSliceAdd(endpointSlice)
5078 fp.syncProxyRules()
5079
5080
5081 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
5082 activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
5083 assert.Equal(t, 4, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK")
5084 assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod")
5085 assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod")
5086 assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod")
5087 assert.Equal(t, true, activeEntries1.Has("10.0.1.4,tcp:80,10.0.1.4"), "Expected activeEntries to reference second (local) pod")
5088
5089 virtualServers, vsErr := ipvs.GetVirtualServers()
5090 assert.Nil(t, vsErr, "Expected no error getting virtual servers")
5091 assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
5092
5093 var clusterIPServer, externalIPServer *utilipvs.VirtualServer
5094 for _, virtualServer := range virtualServers {
5095 if virtualServer.Address.String() == "172.20.1.1" {
5096 clusterIPServer = virtualServer
5097 }
5098
5099 if virtualServer.Address.String() == "1.2.3.4" {
5100 externalIPServer = virtualServer
5101 }
5102 }
5103
5104
5105 realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer)
5106 assert.Nil(t, rsErr1, "Expected no error getting real servers")
5107 assert.Len(t, realServers1, 3, "Expected 3 real servers")
5108 assert.Equal(t, realServers1[0].String(), "10.0.1.1:80")
5109 assert.Equal(t, realServers1[1].String(), "10.0.1.2:80")
5110 assert.Equal(t, realServers1[2].String(), "10.0.1.5:80")
5111
5112
5113 realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer)
5114 assert.Nil(t, rsErr2, "Expected no error getting real servers")
5115 assert.Len(t, realServers2, 2, "Expected 2 real servers")
5116 assert.Equal(t, realServers2[0].String(), "10.0.1.1:80")
5117 assert.Equal(t, realServers2[1].String(), "10.0.1.2:80")
5118
5119 fp.OnEndpointSliceDelete(endpointSlice)
5120 fp.syncProxyRules()
5121
5122
5123 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
5124 activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
5125 assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
5126
5127 virtualServers, vsErr = ipvs.GetVirtualServers()
5128 assert.Nil(t, vsErr, "Expected no error getting virtual servers")
5129 assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
5130
5131 for _, virtualServer := range virtualServers {
5132 if virtualServer.Address.String() == "172.20.1.1" {
5133 clusterIPServer = virtualServer
5134 }
5135
5136 if virtualServer.Address.String() == "1.2.3.4" {
5137 externalIPServer = virtualServer
5138 }
5139 }
5140
5141 realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer)
5142 assert.Nil(t, rsErr1, "Expected no error getting real servers")
5143 assert.Len(t, realServers1, 0, "Expected 0 real servers")
5144
5145 realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer)
5146 assert.Nil(t, rsErr2, "Expected no error getting real servers")
5147 assert.Len(t, realServers2, 0, "Expected 0 real servers")
5148 }
5149
5150
5151
5152 func Test_EndpointSliceOnlyReadyAndTerminatingCluster(t *testing.T) {
5153
5154 ipt := iptablestest.NewFake()
5155 ipvs := ipvstest.NewFake()
5156 ipset := ipsettest.NewFake(testIPSetVersion)
5157 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
5158 fp.servicesSynced = true
5159
5160 fp.endpointSlicesSynced = true
5161
5162
5163 serviceName := "svc1"
5164 namespaceName := "ns1"
5165 fp.OnServiceAdd(&v1.Service{
5166 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
5167 Spec: v1.ServiceSpec{
5168 ClusterIP: "172.20.1.1",
5169 Selector: map[string]string{"foo": "bar"},
5170 Type: v1.ServiceTypeNodePort,
5171 ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyCluster,
5172 InternalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster),
5173 ExternalIPs: []string{
5174 "1.2.3.4",
5175 },
5176 Ports: []v1.ServicePort{
5177 {
5178 Name: "",
5179 Port: 80,
5180 TargetPort: intstr.FromInt32(80),
5181 Protocol: v1.ProtocolTCP,
5182 },
5183 },
5184 },
5185 })
5186
5187
5188 endpointSlice := &discovery.EndpointSlice{
5189 ObjectMeta: metav1.ObjectMeta{
5190 Name: fmt.Sprintf("%s-1", serviceName),
5191 Namespace: namespaceName,
5192 Labels: map[string]string{discovery.LabelServiceName: serviceName},
5193 },
5194 Ports: []discovery.EndpointPort{{
5195 Name: ptr.To(""),
5196 Port: ptr.To[int32](80),
5197 Protocol: ptr.To(v1.ProtocolTCP),
5198 }},
5199 AddressType: discovery.AddressTypeIPv4,
5200 Endpoints: []discovery.Endpoint{
5201 {
5202 Addresses: []string{"10.0.1.1"},
5203 Conditions: discovery.EndpointConditions{
5204 Ready: ptr.To(false),
5205 Serving: ptr.To(true),
5206 Terminating: ptr.To(true),
5207 },
5208 NodeName: ptr.To(testHostname),
5209 },
5210 {
5211 Addresses: []string{"10.0.1.2"},
5212 Conditions: discovery.EndpointConditions{
5213 Ready: ptr.To(false),
5214 Serving: ptr.To(true),
5215 Terminating: ptr.To(true),
5216 },
5217 NodeName: ptr.To(testHostname),
5218 },
5219 {
5220 Addresses: []string{"10.0.1.3"},
5221 Conditions: discovery.EndpointConditions{
5222 Ready: ptr.To(false),
5223 Serving: ptr.To(false),
5224 Terminating: ptr.To(true),
5225 },
5226 NodeName: ptr.To(testHostname),
5227 },
5228 {
5229 Addresses: []string{"10.0.1.4"},
5230 Conditions: discovery.EndpointConditions{
5231 Ready: ptr.To(false),
5232 Serving: ptr.To(true),
5233 Terminating: ptr.To(true),
5234 },
5235 NodeName: ptr.To("another-host"),
5236 },
5237 {
5238 Addresses: []string{"10.0.1.5"},
5239 Conditions: discovery.EndpointConditions{
5240 Ready: ptr.To(false),
5241 Serving: ptr.To(false),
5242 Terminating: ptr.To(false),
5243 },
5244 NodeName: ptr.To("another-host"),
5245 },
5246 },
5247 }
5248
5249 fp.OnEndpointSliceAdd(endpointSlice)
5250 fp.syncProxyRules()
5251
5252
5253 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
5254 activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
5255 assert.Equal(t, 3, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK")
5256 assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod")
5257 assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod")
5258 assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod")
5259
5260 virtualServers, vsErr := ipvs.GetVirtualServers()
5261 assert.Nil(t, vsErr, "Expected no error getting virtual servers")
5262 assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
5263
5264 var clusterIPServer, externalIPServer *utilipvs.VirtualServer
5265 for _, virtualServer := range virtualServers {
5266 if virtualServer.Address.String() == "172.20.1.1" {
5267 clusterIPServer = virtualServer
5268 }
5269
5270 if virtualServer.Address.String() == "1.2.3.4" {
5271 externalIPServer = virtualServer
5272 }
5273 }
5274
5275
5276 realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer)
5277 assert.Nil(t, rsErr1, "Expected no error getting real servers")
5278 assert.Len(t, realServers1, 3, "Expected 1 real servers")
5279 assert.Equal(t, realServers1[0].String(), "10.0.1.1:80")
5280 assert.Equal(t, realServers1[1].String(), "10.0.1.2:80")
5281 assert.Equal(t, realServers1[2].String(), "10.0.1.4:80")
5282
5283
5284 realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer)
5285 assert.Nil(t, rsErr2, "Expected no error getting real servers")
5286 assert.Len(t, realServers2, 3, "Expected 2 real servers")
5287 assert.Equal(t, realServers2[0].String(), "10.0.1.1:80")
5288 assert.Equal(t, realServers2[1].String(), "10.0.1.2:80")
5289 assert.Equal(t, realServers2[2].String(), "10.0.1.4:80")
5290
5291 fp.OnEndpointSliceDelete(endpointSlice)
5292 fp.syncProxyRules()
5293
5294
5295 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
5296 activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
5297 assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
5298
5299 virtualServers, vsErr = ipvs.GetVirtualServers()
5300 assert.Nil(t, vsErr, "Expected no error getting virtual servers")
5301 assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
5302
5303 for _, virtualServer := range virtualServers {
5304 if virtualServer.Address.String() == "172.20.1.1" {
5305 clusterIPServer = virtualServer
5306 }
5307
5308 if virtualServer.Address.String() == "1.2.3.4" {
5309 externalIPServer = virtualServer
5310 }
5311 }
5312
5313 realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer)
5314 assert.Nil(t, rsErr1, "Expected no error getting real servers")
5315 assert.Len(t, realServers1, 0, "Expected 0 real servers")
5316
5317 realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer)
5318 assert.Nil(t, rsErr2, "Expected no error getting real servers")
5319 assert.Len(t, realServers2, 0, "Expected 0 real servers")
5320 }
5321
5322
5323
5324 func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) {
5325
5326 ipt := iptablestest.NewFake()
5327 ipvs := ipvstest.NewFake()
5328 ipset := ipsettest.NewFake(testIPSetVersion)
5329 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
5330 fp.servicesSynced = true
5331
5332 fp.endpointSlicesSynced = true
5333
5334
5335 serviceName := "svc1"
5336 namespaceName := "ns1"
5337 fp.OnServiceAdd(&v1.Service{
5338 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
5339 Spec: v1.ServiceSpec{
5340 ClusterIP: "172.20.1.1",
5341 Selector: map[string]string{"foo": "bar"},
5342 Type: v1.ServiceTypeNodePort,
5343 ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal,
5344 InternalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyCluster),
5345 ExternalIPs: []string{
5346 "1.2.3.4",
5347 },
5348 Ports: []v1.ServicePort{
5349 {
5350 Name: "",
5351 Port: 80,
5352 TargetPort: intstr.FromInt32(80),
5353 Protocol: v1.ProtocolTCP,
5354 },
5355 },
5356 },
5357 })
5358
5359
5360 endpointSlice := &discovery.EndpointSlice{
5361 ObjectMeta: metav1.ObjectMeta{
5362 Name: fmt.Sprintf("%s-1", serviceName),
5363 Namespace: namespaceName,
5364 Labels: map[string]string{discovery.LabelServiceName: serviceName},
5365 },
5366 Ports: []discovery.EndpointPort{{
5367 Name: ptr.To(""),
5368 Port: ptr.To[int32](80),
5369 Protocol: ptr.To(v1.ProtocolTCP),
5370 }},
5371 AddressType: discovery.AddressTypeIPv4,
5372 Endpoints: []discovery.Endpoint{
5373 {
5374 Addresses: []string{"10.0.1.1"},
5375 Conditions: discovery.EndpointConditions{
5376 Ready: ptr.To(false),
5377 Serving: ptr.To(true),
5378 Terminating: ptr.To(true),
5379 },
5380 NodeName: ptr.To(testHostname),
5381 },
5382 {
5383 Addresses: []string{"10.0.1.2"},
5384 Conditions: discovery.EndpointConditions{
5385 Ready: ptr.To(false),
5386 Serving: ptr.To(true),
5387 Terminating: ptr.To(true),
5388 },
5389 NodeName: ptr.To(testHostname),
5390 },
5391 {
5392 Addresses: []string{"10.0.1.3"},
5393 Conditions: discovery.EndpointConditions{
5394 Ready: ptr.To(false),
5395 Serving: ptr.To(false),
5396 Terminating: ptr.To(true),
5397 },
5398 NodeName: ptr.To(testHostname),
5399 },
5400 {
5401 Addresses: []string{"10.0.1.4"},
5402 Conditions: discovery.EndpointConditions{
5403 Ready: ptr.To(false),
5404 Serving: ptr.To(true),
5405 Terminating: ptr.To(true),
5406 },
5407 NodeName: ptr.To("another-host"),
5408 },
5409 {
5410 Addresses: []string{"10.0.1.5"},
5411 Conditions: discovery.EndpointConditions{
5412 Ready: ptr.To(true),
5413 Serving: ptr.To(true),
5414 Terminating: ptr.To(false),
5415 },
5416 NodeName: ptr.To("another-host"),
5417 },
5418 },
5419 }
5420
5421 fp.OnEndpointSliceAdd(endpointSlice)
5422 fp.syncProxyRules()
5423
5424
5425 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
5426 activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
5427 assert.Equal(t, 3, activeEntries1.Len(), "Expected 3 active entry in KUBE-LOOP-BACK")
5428 assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod")
5429 assert.Equal(t, true, activeEntries1.Has("10.0.1.2,tcp:80,10.0.1.2"), "Expected activeEntries to reference second (local) pod")
5430 assert.Equal(t, true, activeEntries1.Has("10.0.1.3,tcp:80,10.0.1.3"), "Expected activeEntries to reference second (local) pod")
5431
5432 virtualServers, vsErr := ipvs.GetVirtualServers()
5433 assert.Nil(t, vsErr, "Expected no error getting virtual servers")
5434 assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
5435
5436 var clusterIPServer, externalIPServer *utilipvs.VirtualServer
5437 for _, virtualServer := range virtualServers {
5438 if virtualServer.Address.String() == "172.20.1.1" {
5439 clusterIPServer = virtualServer
5440 }
5441
5442 if virtualServer.Address.String() == "1.2.3.4" {
5443 externalIPServer = virtualServer
5444 }
5445 }
5446
5447
5448 realServers1, rsErr1 := ipvs.GetRealServers(clusterIPServer)
5449 assert.Nil(t, rsErr1, "Expected no error getting real servers")
5450 assert.Len(t, realServers1, 1, "Expected 1 real servers")
5451 assert.Equal(t, realServers1[0].String(), "10.0.1.5:80")
5452
5453
5454 realServers2, rsErr2 := ipvs.GetRealServers(externalIPServer)
5455 assert.Nil(t, rsErr2, "Expected no error getting real servers")
5456 assert.Len(t, realServers2, 2, "Expected 2 real servers")
5457 assert.Equal(t, realServers2[0].String(), "10.0.1.1:80")
5458 assert.Equal(t, realServers2[1].String(), "10.0.1.2:80")
5459
5460 fp.OnEndpointSliceDelete(endpointSlice)
5461 fp.syncProxyRules()
5462
5463
5464 assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
5465 activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
5466 assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
5467
5468 virtualServers, vsErr = ipvs.GetVirtualServers()
5469 assert.Nil(t, vsErr, "Expected no error getting virtual servers")
5470 assert.Len(t, virtualServers, 2, "Expected 2 virtual server")
5471
5472 for _, virtualServer := range virtualServers {
5473 if virtualServer.Address.String() == "172.20.1.1" {
5474 clusterIPServer = virtualServer
5475 }
5476
5477 if virtualServer.Address.String() == "1.2.3.4" {
5478 externalIPServer = virtualServer
5479 }
5480 }
5481
5482 realServers1, rsErr1 = ipvs.GetRealServers(clusterIPServer)
5483 assert.Nil(t, rsErr1, "Expected no error getting real servers")
5484 assert.Len(t, realServers1, 0, "Expected 0 real servers")
5485
5486 realServers2, rsErr2 = ipvs.GetRealServers(externalIPServer)
5487 assert.Nil(t, rsErr2, "Expected no error getting real servers")
5488 assert.Len(t, realServers2, 0, "Expected 0 real servers")
5489 }
5490
5491 func TestIpIsValidForSet(t *testing.T) {
5492 testCases := []struct {
5493 isIPv6 bool
5494 ip string
5495 res bool
5496 }{
5497 {
5498 false,
5499 "127.0.0.1",
5500 false,
5501 },
5502 {
5503 false,
5504 "127.0.0.0",
5505 false,
5506 },
5507 {
5508 false,
5509 "127.6.7.8",
5510 false,
5511 },
5512 {
5513 false,
5514 "8.8.8.8",
5515 true,
5516 },
5517 {
5518 false,
5519 "192.168.0.1",
5520 true,
5521 },
5522 {
5523 false,
5524 "169.254.0.0",
5525 true,
5526 },
5527 {
5528 false,
5529 "::ffff:169.254.0.0",
5530 true,
5531 },
5532 {
5533 false,
5534 "1000::",
5535 false,
5536 },
5537
5538 {
5539 true,
5540 "::1",
5541 false,
5542 },
5543 {
5544 true,
5545 "1000::",
5546 true,
5547 },
5548 {
5549 true,
5550 "fe80::200:ff:fe01:1",
5551 false,
5552 },
5553 {
5554 true,
5555 "8.8.8.8",
5556 false,
5557 },
5558 {
5559 true,
5560 "::ffff:8.8.8.8",
5561 false,
5562 },
5563 }
5564
5565 for _, tc := range testCases {
5566 v := &netlinkHandle{}
5567 v.isIPv6 = tc.isIPv6
5568 ip := netutils.ParseIPSloppy(tc.ip)
5569 if ip == nil {
5570 t.Errorf("Parse error: %s", tc.ip)
5571 }
5572 if v.isValidForSet(ip) != tc.res {
5573 if tc.isIPv6 {
5574 t.Errorf("IPv6: %s", tc.ip)
5575 } else {
5576 t.Errorf("IPv4: %s", tc.ip)
5577 }
5578 }
5579 }
5580 }
5581
5582 func TestNoEndpointsMetric(t *testing.T) {
5583 type endpoint struct {
5584 ip string
5585 hostname string
5586 }
5587
5588 metrics.RegisterMetrics()
5589
5590 testCases := []struct {
5591 name string
5592 internalTrafficPolicy *v1.ServiceInternalTrafficPolicy
5593 externalTrafficPolicy v1.ServiceExternalTrafficPolicy
5594 endpoints []endpoint
5595 expectedSyncProxyRulesNoLocalEndpointsTotalInternal int
5596 expectedSyncProxyRulesNoLocalEndpointsTotalExternal int
5597 }{
5598 {
5599 name: "internalTrafficPolicy is set and there are local endpoints",
5600 internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal),
5601 endpoints: []endpoint{
5602 {"10.0.1.1", testHostname},
5603 {"10.0.1.2", "host1"},
5604 {"10.0.1.3", "host2"},
5605 },
5606 },
5607 {
5608 name: "externalTrafficPolicy is set and there are local endpoints",
5609 externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal,
5610 endpoints: []endpoint{
5611 {"10.0.1.1", testHostname},
5612 {"10.0.1.2", "host1"},
5613 {"10.0.1.3", "host2"},
5614 },
5615 },
5616 {
5617 name: "both policies are set and there are local endpoints",
5618 internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal),
5619 externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal,
5620 endpoints: []endpoint{
5621 {"10.0.1.1", testHostname},
5622 {"10.0.1.2", "host1"},
5623 {"10.0.1.3", "host2"},
5624 },
5625 },
5626 {
5627 name: "internalTrafficPolicy is set and there are no local endpoints",
5628 internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal),
5629 endpoints: []endpoint{
5630 {"10.0.1.1", "host0"},
5631 {"10.0.1.2", "host1"},
5632 {"10.0.1.3", "host2"},
5633 },
5634 expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1,
5635 },
5636 {
5637 name: "externalTrafficPolicy is set and there are no local endpoints",
5638 externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal,
5639 endpoints: []endpoint{
5640 {"10.0.1.1", "host0"},
5641 {"10.0.1.2", "host1"},
5642 {"10.0.1.3", "host2"},
5643 },
5644 expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1,
5645 },
5646 {
5647 name: "Both policies are set and there are no local endpoints",
5648 internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal),
5649 externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal,
5650 endpoints: []endpoint{
5651 {"10.0.1.1", "host0"},
5652 {"10.0.1.2", "host1"},
5653 {"10.0.1.3", "host2"},
5654 },
5655 expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1,
5656 expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1,
5657 },
5658 {
5659 name: "Both policies are set and there are no endpoints at all",
5660 internalTrafficPolicy: ptr.To(v1.ServiceInternalTrafficPolicyLocal),
5661 externalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal,
5662 endpoints: []endpoint{},
5663 expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 0,
5664 expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 0,
5665 },
5666 }
5667 for _, tc := range testCases {
5668 ipt := iptablestest.NewFake()
5669 ipvs := ipvstest.NewFake()
5670 ipset := ipsettest.NewFake(testIPSetVersion)
5671 fp := NewFakeProxier(ipt, ipvs, ipset, []string{"10.0.0.1"}, nil, v1.IPv4Protocol)
5672 fp.servicesSynced = true
5673
5674 fp.endpointSlicesSynced = true
5675
5676
5677 serviceName := "svc1"
5678 namespaceName := "ns1"
5679
5680 svc := &v1.Service{
5681 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
5682 Spec: v1.ServiceSpec{
5683 ClusterIP: "172.20.1.1",
5684 Selector: map[string]string{"foo": "bar"},
5685 Ports: []v1.ServicePort{{Name: "p80", Port: 80, TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP, NodePort: 30000}},
5686 },
5687 }
5688 if tc.internalTrafficPolicy != nil {
5689 svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy
5690 }
5691 if tc.externalTrafficPolicy != "" {
5692 svc.Spec.Type = v1.ServiceTypeNodePort
5693 svc.Spec.ExternalTrafficPolicy = tc.externalTrafficPolicy
5694 }
5695
5696 fp.OnServiceAdd(svc)
5697
5698
5699 endpointSlice := &discovery.EndpointSlice{
5700 ObjectMeta: metav1.ObjectMeta{
5701 Name: fmt.Sprintf("%s-1", serviceName),
5702 Namespace: namespaceName,
5703 Labels: map[string]string{discovery.LabelServiceName: serviceName},
5704 },
5705 Ports: []discovery.EndpointPort{{
5706 Name: ptr.To("p80"),
5707 Port: ptr.To[int32](80),
5708 Protocol: ptr.To(v1.ProtocolTCP),
5709 }},
5710 AddressType: discovery.AddressTypeIPv4,
5711 }
5712
5713 for _, ep := range tc.endpoints {
5714 endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{
5715 Addresses: []string{ep.ip},
5716 Conditions: discovery.EndpointConditions{Ready: ptr.To(true)},
5717 NodeName: ptr.To(ep.hostname),
5718 })
5719 }
5720
5721 fp.OnEndpointSliceAdd(endpointSlice)
5722 fp.syncProxyRules()
5723
5724 syncProxyRulesNoLocalEndpointsTotalInternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal"))
5725 if err != nil {
5726 t.Errorf("failed to get %s value(internal), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err)
5727 }
5728
5729 if tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal != int(syncProxyRulesNoLocalEndpointsTotalInternal) {
5730 t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalInternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal)
5731 }
5732
5733 syncProxyRulesNoLocalEndpointsTotalExternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external"))
5734 if err != nil {
5735 t.Errorf("failed to get %s value(external), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err)
5736 }
5737
5738 if tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal != int(syncProxyRulesNoLocalEndpointsTotalExternal) {
5739 t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(external): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal)
5740 }
5741 }
5742 }
5743
5744 func TestDismissLocalhostRuleExist(t *testing.T) {
5745 tests := []struct {
5746 name string
5747 ipFamily v1.IPFamily
5748 src string
5749 }{
5750 {
5751 name: "ipv4 rule",
5752 ipFamily: v1.IPv4Protocol,
5753 src: "127.0.0.0/8",
5754 },
5755 {
5756 name: "ipv6 rule",
5757 ipFamily: v1.IPv6Protocol,
5758 src: "::1/128",
5759 },
5760 }
5761 for _, test := range tests {
5762 t.Run(test.name, func(t *testing.T) {
5763 ipt := iptablestest.NewFake()
5764 if test.ipFamily == v1.IPv6Protocol {
5765 ipt = iptablestest.NewIPv6Fake()
5766 }
5767 ipvs := ipvstest.NewFake()
5768 ipset := ipsettest.NewFake(testIPSetVersion)
5769 fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, test.ipFamily)
5770
5771 fp.syncProxyRules()
5772
5773 rules := getRules(ipt, kubeServicesChain)
5774 if len(rules) <= 0 {
5775 t.Errorf("skip loop back ip in kubeservice chain not exist")
5776 return
5777 }
5778 if !rules[0].Jump.Matches("RETURN") || !rules[0].SourceAddress.Matches(test.src) {
5779 t.Errorf("rules not match, expect jump: %s, got: %s; expect source address: %s, got: %s", "RETURN", rules[0].Jump.String(), test.src, rules[0].SourceAddress.String())
5780 }
5781 })
5782 }
5783 }
5784
5785 func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) {
5786 testCases := []struct {
5787 name string
5788 ipModeEnabled bool
5789 svcIP string
5790 svcLBIP string
5791 ipMode *v1.LoadBalancerIPMode
5792 expectedServices int
5793 }{
5794
5795 {
5796 name: "LoadBalancerIPMode disabled, ipMode Proxy",
5797 ipModeEnabled: false,
5798 svcIP: "10.20.30.41",
5799 svcLBIP: "1.2.3.4",
5800 ipMode: ptr.To(v1.LoadBalancerIPModeProxy),
5801 expectedServices: 2,
5802 },
5803 {
5804 name: "LoadBalancerIPMode disabled, ipMode VIP",
5805 ipModeEnabled: false,
5806 svcIP: "10.20.30.42",
5807 svcLBIP: "1.2.3.5",
5808 ipMode: ptr.To(v1.LoadBalancerIPModeVIP),
5809 expectedServices: 2,
5810 },
5811 {
5812 name: "LoadBalancerIPMode disabled, ipMode nil",
5813 ipModeEnabled: false,
5814 svcIP: "10.20.30.43",
5815 svcLBIP: "1.2.3.6",
5816 ipMode: nil,
5817 expectedServices: 2,
5818 },
5819
5820 {
5821 name: "LoadBalancerIPMode enabled, ipMode Proxy",
5822 ipModeEnabled: true,
5823 svcIP: "10.20.30.41",
5824 svcLBIP: "1.2.3.4",
5825 ipMode: ptr.To(v1.LoadBalancerIPModeProxy),
5826 expectedServices: 1,
5827 },
5828 {
5829 name: "LoadBalancerIPMode enabled, ipMode VIP",
5830 ipModeEnabled: true,
5831 svcIP: "10.20.30.42",
5832 svcLBIP: "1.2.3.5",
5833 ipMode: ptr.To(v1.LoadBalancerIPModeVIP),
5834 expectedServices: 2,
5835 },
5836 {
5837 name: "LoadBalancerIPMode enabled, ipMode nil",
5838 ipModeEnabled: true,
5839 svcIP: "10.20.30.43",
5840 svcLBIP: "1.2.3.6",
5841 ipMode: nil,
5842 expectedServices: 2,
5843 },
5844 }
5845
5846 svcPort := 80
5847 svcNodePort := 3001
5848 svcPortName := proxy.ServicePortName{
5849 NamespacedName: makeNSN("ns1", "svc1"),
5850 Port: "p80",
5851 }
5852
5853 for _, testCase := range testCases {
5854 t.Run(testCase.name, func(t *testing.T) {
5855 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, testCase.ipModeEnabled)()
5856 _, fp := buildFakeProxier()
5857 makeServiceMap(fp,
5858 makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
5859 svc.Spec.Type = "LoadBalancer"
5860 svc.Spec.ClusterIP = testCase.svcIP
5861 svc.Spec.Ports = []v1.ServicePort{{
5862 Name: svcPortName.Port,
5863 Port: int32(svcPort),
5864 Protocol: v1.ProtocolTCP,
5865 NodePort: int32(svcNodePort),
5866 }}
5867 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
5868 IP: testCase.svcLBIP,
5869 IPMode: testCase.ipMode,
5870 }}
5871 }),
5872 )
5873
5874 makeEndpointSliceMap(fp,
5875 makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
5876 eps.AddressType = discovery.AddressTypeIPv4
5877 eps.Endpoints = []discovery.Endpoint{{
5878 Addresses: []string{"10.180.0.1"},
5879 }}
5880 eps.Ports = []discovery.EndpointPort{{
5881 Name: ptr.To("p80"),
5882 Port: ptr.To[int32](80),
5883 Protocol: ptr.To(v1.ProtocolTCP),
5884 }}
5885 }),
5886 )
5887
5888 fp.syncProxyRules()
5889
5890 services, err := fp.ipvs.GetVirtualServers()
5891 if err != nil {
5892 t.Errorf("Failed to get ipvs services, err: %v", err)
5893 }
5894 if len(services) != testCase.expectedServices {
5895 t.Errorf("Expected %d ipvs services, got %d", testCase.expectedServices, len(services))
5896 }
5897 })
5898 }
5899 }
5900
View as plain text