1
16
17 package proxy
18
19 import (
20 "net"
21 "reflect"
22 "testing"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/types"
28 "k8s.io/apimachinery/pkg/util/dump"
29 "k8s.io/apimachinery/pkg/util/intstr"
30 utilfeature "k8s.io/apiserver/pkg/util/feature"
31 featuregatetesting "k8s.io/component-base/featuregate/testing"
32 "k8s.io/kubernetes/pkg/features"
33 netutils "k8s.io/utils/net"
34 )
35
36 const testHostname = "test-hostname"
37
38 func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int, svcInfoFuncs ...func(*BaseServicePortInfo)) *BaseServicePortInfo {
39 bsvcPortInfo := &BaseServicePortInfo{
40 clusterIP: netutils.ParseIPSloppy(clusterIP),
41 port: port,
42 protocol: v1.Protocol(protocol),
43 }
44 if healthcheckNodePort != 0 {
45 bsvcPortInfo.healthCheckNodePort = healthcheckNodePort
46 }
47 for _, svcInfoFunc := range svcInfoFuncs {
48 svcInfoFunc(bsvcPortInfo)
49 }
50 return bsvcPortInfo
51 }
52
53 func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
54 svc := &v1.Service{
55 ObjectMeta: metav1.ObjectMeta{
56 Name: name,
57 Namespace: namespace,
58 Annotations: map[string]string{},
59 },
60 Spec: v1.ServiceSpec{},
61 Status: v1.ServiceStatus{},
62 }
63 svcFunc(svc)
64 return svc
65 }
66
67 func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port, nodeport int32, targetPort int) []v1.ServicePort {
68 svcPort := v1.ServicePort{
69 Name: name,
70 Protocol: protocol,
71 Port: port,
72 NodePort: nodeport,
73 TargetPort: intstr.FromInt32(int32(targetPort)),
74 }
75 return append(array, svcPort)
76 }
77
78 func makeNSN(namespace, name string) types.NamespacedName {
79 return types.NamespacedName{Namespace: namespace, Name: name}
80 }
81
82 func makeServicePortName(ns, name, port string, protocol v1.Protocol) ServicePortName {
83 return ServicePortName{
84 NamespacedName: makeNSN(ns, name),
85 Port: port,
86 Protocol: protocol,
87 }
88 }
89 func makeIPs(ipStr ...string) []net.IP {
90 var ips []net.IP
91 for _, s := range ipStr {
92 ips = append(ips, netutils.ParseIPSloppy(s))
93 }
94 return ips
95 }
96 func mustMakeCIDRs(cidrStr ...string) []*net.IPNet {
97 var cidrs []*net.IPNet
98 for _, s := range cidrStr {
99 if _, n, err := netutils.ParseCIDRSloppy(s); err == nil {
100 cidrs = append(cidrs, n)
101 } else {
102 panic(err)
103 }
104 }
105 return cidrs
106 }
107
108 func TestServiceToServiceMap(t *testing.T) {
109 testClusterIPv4 := "10.0.0.1"
110 testExternalIPv4 := "8.8.8.8"
111 testSourceRangeIPv4 := "0.0.0.0/1"
112 testClusterIPv6 := "2001:db8:85a3:0:0:8a2e:370:7334"
113 testExternalIPv6 := "2001:db8:85a3:0:0:8a2e:370:7335"
114 testSourceRangeIPv6 := "2001:db8::/32"
115 ipModeVIP := v1.LoadBalancerIPModeVIP
116 ipModeProxy := v1.LoadBalancerIPModeProxy
117
118 testCases := []struct {
119 desc string
120 service *v1.Service
121 expected map[ServicePortName]*BaseServicePortInfo
122 ipFamily v1.IPFamily
123 ipModeEnabled bool
124 }{
125 {
126 desc: "nothing",
127 ipFamily: v1.IPv4Protocol,
128
129 service: nil,
130 expected: map[ServicePortName]*BaseServicePortInfo{},
131 },
132 {
133 desc: "headless service",
134 ipFamily: v1.IPv4Protocol,
135
136 service: makeTestService("ns2", "headless", func(svc *v1.Service) {
137 svc.Spec.Type = v1.ServiceTypeClusterIP
138 svc.Spec.ClusterIP = v1.ClusterIPNone
139 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
140 }),
141 expected: map[ServicePortName]*BaseServicePortInfo{},
142 },
143 {
144 desc: "headless sctp service",
145 ipFamily: v1.IPv4Protocol,
146
147 service: makeTestService("ns2", "headless", func(svc *v1.Service) {
148 svc.Spec.Type = v1.ServiceTypeClusterIP
149 svc.Spec.ClusterIP = v1.ClusterIPNone
150 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sip", "SCTP", 7777, 0, 0)
151 }),
152 expected: map[ServicePortName]*BaseServicePortInfo{},
153 },
154 {
155 desc: "headless service without port",
156 ipFamily: v1.IPv4Protocol,
157
158 service: makeTestService("ns2", "headless-without-port", func(svc *v1.Service) {
159 svc.Spec.Type = v1.ServiceTypeClusterIP
160 svc.Spec.ClusterIP = v1.ClusterIPNone
161 }),
162 expected: map[ServicePortName]*BaseServicePortInfo{},
163 },
164 {
165 desc: "cluster ip service",
166 ipFamily: v1.IPv4Protocol,
167
168 service: makeTestService("ns2", "cluster-ip", func(svc *v1.Service) {
169 svc.Spec.Type = v1.ServiceTypeClusterIP
170 svc.Spec.ClusterIP = "172.16.55.4"
171 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p1", "UDP", 1234, 4321, 0)
172 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "UDP", 1235, 5321, 0)
173 }),
174 expected: map[ServicePortName]*BaseServicePortInfo{
175 makeServicePortName("ns2", "cluster-ip", "p1", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.4", 1234, "UDP", 0),
176 makeServicePortName("ns2", "cluster-ip", "p2", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.4", 1235, "UDP", 0),
177 },
178 },
179 {
180 desc: "nodeport service",
181 ipFamily: v1.IPv4Protocol,
182
183 service: makeTestService("ns2", "node-port", func(svc *v1.Service) {
184 svc.Spec.Type = v1.ServiceTypeNodePort
185 svc.Spec.ClusterIP = "172.16.55.10"
186 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "UDP", 345, 678, 0)
187 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 344, 677, 0)
188 }),
189 expected: map[ServicePortName]*BaseServicePortInfo{
190 makeServicePortName("ns2", "node-port", "port1", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.10", 345, "UDP", 0),
191 makeServicePortName("ns2", "node-port", "port2", v1.ProtocolTCP): makeTestServiceInfo("172.16.55.10", 344, "TCP", 0),
192 },
193 },
194 {
195 desc: "load balancer service",
196 ipFamily: v1.IPv4Protocol,
197
198 service: makeTestService("ns1", "load-balancer", func(svc *v1.Service) {
199 svc.Spec.Type = v1.ServiceTypeLoadBalancer
200 svc.Spec.ClusterIP = "172.16.55.11"
201 svc.Spec.LoadBalancerIP = "5.6.7.8"
202 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port3", "UDP", 8675, 30061, 7000)
203 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port4", "UDP", 8676, 30062, 7001)
204 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}}
205 }),
206 expected: map[ServicePortName]*BaseServicePortInfo{
207 makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
208 bsvcPortInfo.loadBalancerVIPs = makeIPs("10.1.2.4")
209 }),
210 makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
211 bsvcPortInfo.loadBalancerVIPs = makeIPs("10.1.2.4")
212 }),
213 },
214 },
215 {
216 desc: "load balancer service ipMode VIP feature gate disable",
217 ipFamily: v1.IPv4Protocol,
218
219 service: makeTestService("ns1", "load-balancer", func(svc *v1.Service) {
220 svc.Spec.Type = v1.ServiceTypeLoadBalancer
221 svc.Spec.ClusterIP = "172.16.55.11"
222 svc.Spec.LoadBalancerIP = "5.6.7.8"
223 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port3", "UDP", 8675, 30061, 7000)
224 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port4", "UDP", 8676, 30062, 7001)
225 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4", IPMode: &ipModeVIP}}
226 }),
227 expected: map[ServicePortName]*BaseServicePortInfo{
228 makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
229 bsvcPortInfo.loadBalancerVIPs = makeIPs("10.1.2.4")
230 }),
231 makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
232 bsvcPortInfo.loadBalancerVIPs = makeIPs("10.1.2.4")
233 }),
234 },
235 },
236 {
237 desc: "load balancer service ipMode Proxy feature gate disable",
238 ipFamily: v1.IPv4Protocol,
239
240 service: makeTestService("ns1", "load-balancer", func(svc *v1.Service) {
241 svc.Spec.Type = v1.ServiceTypeLoadBalancer
242 svc.Spec.ClusterIP = "172.16.55.11"
243 svc.Spec.LoadBalancerIP = "5.6.7.8"
244 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port3", "UDP", 8675, 30061, 7000)
245 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port4", "UDP", 8676, 30062, 7001)
246 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4", IPMode: &ipModeProxy}}
247 }),
248 expected: map[ServicePortName]*BaseServicePortInfo{
249 makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
250 bsvcPortInfo.loadBalancerVIPs = makeIPs("10.1.2.4")
251 }),
252 makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
253 bsvcPortInfo.loadBalancerVIPs = makeIPs("10.1.2.4")
254 }),
255 },
256 },
257 {
258 desc: "load balancer service ipMode VIP feature gate enabled",
259 ipFamily: v1.IPv4Protocol,
260 ipModeEnabled: true,
261
262 service: makeTestService("ns1", "load-balancer", func(svc *v1.Service) {
263 svc.Spec.Type = v1.ServiceTypeLoadBalancer
264 svc.Spec.ClusterIP = "172.16.55.11"
265 svc.Spec.LoadBalancerIP = "5.6.7.8"
266 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port3", "UDP", 8675, 30061, 7000)
267 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port4", "UDP", 8676, 30062, 7001)
268 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4", IPMode: &ipModeVIP}}
269 }),
270 expected: map[ServicePortName]*BaseServicePortInfo{
271 makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
272 bsvcPortInfo.loadBalancerVIPs = makeIPs("10.1.2.4")
273 }),
274 makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
275 bsvcPortInfo.loadBalancerVIPs = makeIPs("10.1.2.4")
276 }),
277 },
278 },
279 {
280 desc: "load balancer service ipMode Proxy feature gate enabled",
281 ipFamily: v1.IPv4Protocol,
282 ipModeEnabled: true,
283
284 service: makeTestService("ns1", "load-balancer", func(svc *v1.Service) {
285 svc.Spec.Type = v1.ServiceTypeLoadBalancer
286 svc.Spec.ClusterIP = "172.16.55.11"
287 svc.Spec.LoadBalancerIP = "5.6.7.8"
288 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port3", "UDP", 8675, 30061, 7000)
289 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port4", "UDP", 8676, 30062, 7001)
290 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4", IPMode: &ipModeProxy}}
291 }),
292 expected: map[ServicePortName]*BaseServicePortInfo{
293 makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
294 }),
295 makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
296 }),
297 },
298 },
299 {
300 desc: "load balancer service with only local traffic policy",
301 ipFamily: v1.IPv4Protocol,
302
303 service: makeTestService("ns1", "only-local-load-balancer", func(svc *v1.Service) {
304 svc.Spec.Type = v1.ServiceTypeLoadBalancer
305 svc.Spec.ClusterIP = "172.16.55.12"
306 svc.Spec.LoadBalancerIP = "5.6.7.8"
307 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "portx", "UDP", 8677, 30063, 7002)
308 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "porty", "UDP", 8678, 30064, 7003)
309 svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}}
310 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
311 svc.Spec.HealthCheckNodePort = 345
312 }),
313 expected: map[ServicePortName]*BaseServicePortInfo{
314 makeServicePortName("ns1", "only-local-load-balancer", "portx", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8677, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) {
315 bsvcPortInfo.loadBalancerVIPs = makeIPs("10.1.2.3")
316 }),
317 makeServicePortName("ns1", "only-local-load-balancer", "porty", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8678, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) {
318 bsvcPortInfo.loadBalancerVIPs = makeIPs("10.1.2.3")
319 }),
320 },
321 },
322 {
323 desc: "external name service",
324 ipFamily: v1.IPv4Protocol,
325
326 service: makeTestService("ns2", "external-name", func(svc *v1.Service) {
327 svc.Spec.Type = v1.ServiceTypeExternalName
328 svc.Spec.ClusterIP = "172.16.55.4"
329 svc.Spec.ExternalName = "foo2.bar.com"
330 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "portz", "UDP", 1235, 5321, 0)
331 }),
332 expected: map[ServicePortName]*BaseServicePortInfo{},
333 },
334 {
335 desc: "service with ipv6 clusterIP under ipv4 mode, service should be filtered",
336 ipFamily: v1.IPv4Protocol,
337
338 service: &v1.Service{
339 ObjectMeta: metav1.ObjectMeta{
340 Name: "invalidIPv6InIPV4Mode",
341 Namespace: "test",
342 },
343 Spec: v1.ServiceSpec{
344 ClusterIP: testClusterIPv6,
345 Ports: []v1.ServicePort{
346 {
347 Name: "testPort",
348 Port: int32(12345),
349 Protocol: v1.ProtocolTCP,
350 },
351 },
352 },
353 Status: v1.ServiceStatus{
354 LoadBalancer: v1.LoadBalancerStatus{
355 Ingress: []v1.LoadBalancerIngress{
356 {IP: testExternalIPv4},
357 {IP: testExternalIPv6},
358 },
359 },
360 },
361 },
362 },
363 {
364 desc: "service with ipv4 clusterIP under ipv6 mode, service should be filtered",
365 ipFamily: v1.IPv6Protocol,
366
367 service: &v1.Service{
368 ObjectMeta: metav1.ObjectMeta{
369 Name: "invalidIPv4InIPV6Mode",
370 Namespace: "test",
371 },
372 Spec: v1.ServiceSpec{
373 ClusterIP: testClusterIPv4,
374 Ports: []v1.ServicePort{
375 {
376 Name: "testPort",
377 Port: int32(12345),
378 Protocol: v1.ProtocolTCP,
379 },
380 },
381 },
382 Status: v1.ServiceStatus{
383 LoadBalancer: v1.LoadBalancerStatus{
384 Ingress: []v1.LoadBalancerIngress{
385 {IP: testExternalIPv4},
386 {IP: testExternalIPv6},
387 },
388 },
389 },
390 },
391 },
392 {
393 desc: "service with ipv4 configurations under ipv4 mode",
394 ipFamily: v1.IPv4Protocol,
395
396 service: &v1.Service{
397 ObjectMeta: metav1.ObjectMeta{
398 Name: "validIPv4",
399 Namespace: "test",
400 },
401 Spec: v1.ServiceSpec{
402 ClusterIP: testClusterIPv4,
403 ExternalIPs: []string{testExternalIPv4},
404 LoadBalancerSourceRanges: []string{testSourceRangeIPv4},
405 Ports: []v1.ServicePort{
406 {
407 Name: "testPort",
408 Port: int32(12345),
409 Protocol: v1.ProtocolTCP,
410 },
411 },
412 },
413 Status: v1.ServiceStatus{
414 LoadBalancer: v1.LoadBalancerStatus{
415 Ingress: []v1.LoadBalancerIngress{
416 {IP: testExternalIPv4},
417 {IP: testExternalIPv6},
418 },
419 },
420 },
421 },
422 expected: map[ServicePortName]*BaseServicePortInfo{
423 makeServicePortName("test", "validIPv4", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
424 bsvcPortInfo.externalIPs = makeIPs(testExternalIPv4)
425 bsvcPortInfo.loadBalancerSourceRanges = mustMakeCIDRs(testSourceRangeIPv4)
426 bsvcPortInfo.loadBalancerVIPs = makeIPs(testExternalIPv4)
427 }),
428 },
429 },
430 {
431 desc: "service with ipv6 configurations under ipv6 mode",
432 ipFamily: v1.IPv6Protocol,
433
434 service: &v1.Service{
435 ObjectMeta: metav1.ObjectMeta{
436 Name: "validIPv6",
437 Namespace: "test",
438 },
439 Spec: v1.ServiceSpec{
440 ClusterIP: testClusterIPv6,
441 ExternalIPs: []string{testExternalIPv6},
442 LoadBalancerSourceRanges: []string{testSourceRangeIPv6},
443 Ports: []v1.ServicePort{
444 {
445 Name: "testPort",
446 Port: int32(12345),
447 Protocol: v1.ProtocolTCP,
448 },
449 },
450 },
451 Status: v1.ServiceStatus{
452 LoadBalancer: v1.LoadBalancerStatus{
453 Ingress: []v1.LoadBalancerIngress{
454 {IP: testExternalIPv4},
455 {IP: testExternalIPv6},
456 },
457 },
458 },
459 },
460 expected: map[ServicePortName]*BaseServicePortInfo{
461 makeServicePortName("test", "validIPv6", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
462 bsvcPortInfo.externalIPs = makeIPs(testExternalIPv6)
463 bsvcPortInfo.loadBalancerSourceRanges = mustMakeCIDRs(testSourceRangeIPv6)
464 bsvcPortInfo.loadBalancerVIPs = makeIPs(testExternalIPv6)
465 }),
466 },
467 },
468 {
469 desc: "service with both ipv4 and ipv6 configurations under ipv4 mode, ipv6 fields should be filtered",
470 ipFamily: v1.IPv4Protocol,
471
472 service: &v1.Service{
473 ObjectMeta: metav1.ObjectMeta{
474 Name: "filterIPv6InIPV4Mode",
475 Namespace: "test",
476 },
477 Spec: v1.ServiceSpec{
478 ClusterIP: testClusterIPv4,
479 ExternalIPs: []string{testExternalIPv4, testExternalIPv6},
480 LoadBalancerSourceRanges: []string{testSourceRangeIPv4, testSourceRangeIPv6},
481 Ports: []v1.ServicePort{
482 {
483 Name: "testPort",
484 Port: int32(12345),
485 Protocol: v1.ProtocolTCP,
486 },
487 },
488 },
489 Status: v1.ServiceStatus{
490 LoadBalancer: v1.LoadBalancerStatus{
491 Ingress: []v1.LoadBalancerIngress{
492 {IP: testExternalIPv4},
493 {IP: testExternalIPv6},
494 },
495 },
496 },
497 },
498 expected: map[ServicePortName]*BaseServicePortInfo{
499 makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
500 bsvcPortInfo.externalIPs = makeIPs(testExternalIPv4)
501 bsvcPortInfo.loadBalancerSourceRanges = mustMakeCIDRs(testSourceRangeIPv4)
502 bsvcPortInfo.loadBalancerVIPs = makeIPs(testExternalIPv4)
503 }),
504 },
505 },
506 {
507 desc: "service with both ipv4 and ipv6 configurations under ipv6 mode, ipv4 fields should be filtered",
508 ipFamily: v1.IPv6Protocol,
509
510 service: &v1.Service{
511 ObjectMeta: metav1.ObjectMeta{
512 Name: "filterIPv4InIPV6Mode",
513 Namespace: "test",
514 },
515 Spec: v1.ServiceSpec{
516 ClusterIP: testClusterIPv6,
517 ExternalIPs: []string{testExternalIPv4, testExternalIPv6},
518 LoadBalancerSourceRanges: []string{testSourceRangeIPv4, testSourceRangeIPv6},
519 Ports: []v1.ServicePort{
520 {
521 Name: "testPort",
522 Port: int32(12345),
523 Protocol: v1.ProtocolTCP,
524 },
525 },
526 },
527 Status: v1.ServiceStatus{
528 LoadBalancer: v1.LoadBalancerStatus{
529 Ingress: []v1.LoadBalancerIngress{
530 {IP: testExternalIPv4},
531 {IP: testExternalIPv6},
532 },
533 },
534 },
535 },
536 expected: map[ServicePortName]*BaseServicePortInfo{
537 makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
538 bsvcPortInfo.externalIPs = makeIPs(testExternalIPv6)
539 bsvcPortInfo.loadBalancerSourceRanges = mustMakeCIDRs(testSourceRangeIPv6)
540 bsvcPortInfo.loadBalancerVIPs = makeIPs(testExternalIPv6)
541 }),
542 },
543 },
544 {
545 desc: "service with extra space in LoadBalancerSourceRanges",
546 ipFamily: v1.IPv4Protocol,
547
548 service: &v1.Service{
549 ObjectMeta: metav1.ObjectMeta{
550 Name: "extra-space",
551 Namespace: "test",
552 },
553 Spec: v1.ServiceSpec{
554 ClusterIP: testClusterIPv4,
555 LoadBalancerSourceRanges: []string{" 10.1.2.0/28"},
556 Ports: []v1.ServicePort{
557 {
558 Name: "testPort",
559 Port: int32(12345),
560 Protocol: v1.ProtocolTCP,
561 },
562 },
563 },
564 },
565 expected: map[ServicePortName]*BaseServicePortInfo{
566 makeServicePortName("test", "extra-space", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
567 bsvcPortInfo.loadBalancerSourceRanges = mustMakeCIDRs("10.1.2.0/28")
568 }),
569 },
570 },
571 }
572
573 for _, tc := range testCases {
574 t.Run(tc.desc, func(t *testing.T) {
575 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, tc.ipModeEnabled)()
576 svcTracker := NewServiceChangeTracker(nil, tc.ipFamily, nil, nil)
577
578 newServices := svcTracker.serviceToServiceMap(tc.service)
579
580 if len(newServices) != len(tc.expected) {
581 t.Fatalf("expected %d new, got %d: %v", len(tc.expected), len(newServices), dump.Pretty(newServices))
582 }
583 for svcKey, expectedInfo := range tc.expected {
584 svcInfo, exists := newServices[svcKey].(*BaseServicePortInfo)
585 if !exists {
586 t.Fatalf("[%s] expected to find key %s", tc.desc, svcKey)
587 }
588
589 if !svcInfo.clusterIP.Equal(expectedInfo.clusterIP) ||
590 svcInfo.port != expectedInfo.port ||
591 svcInfo.protocol != expectedInfo.protocol ||
592 svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort ||
593 !reflect.DeepEqual(svcInfo.externalIPs, expectedInfo.externalIPs) ||
594 !reflect.DeepEqual(svcInfo.loadBalancerSourceRanges, expectedInfo.loadBalancerSourceRanges) ||
595 !reflect.DeepEqual(svcInfo.loadBalancerVIPs, expectedInfo.loadBalancerVIPs) {
596 t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo)
597 }
598 for svcKey, expectedInfo := range tc.expected {
599 svcInfo, _ := newServices[svcKey].(*BaseServicePortInfo)
600 if !svcInfo.clusterIP.Equal(expectedInfo.clusterIP) ||
601 svcInfo.port != expectedInfo.port ||
602 svcInfo.protocol != expectedInfo.protocol ||
603 svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort ||
604 !reflect.DeepEqual(svcInfo.externalIPs, expectedInfo.externalIPs) ||
605 !reflect.DeepEqual(svcInfo.loadBalancerSourceRanges, expectedInfo.loadBalancerSourceRanges) ||
606 !reflect.DeepEqual(svcInfo.loadBalancerVIPs, expectedInfo.loadBalancerVIPs) {
607 t.Errorf("expected new[%v]to be %v, got %v", svcKey, expectedInfo, *svcInfo)
608 }
609 }
610 }
611 })
612 }
613 }
614
615 type FakeProxier struct {
616 endpointsChanges *EndpointsChangeTracker
617 serviceChanges *ServiceChangeTracker
618 svcPortMap ServicePortMap
619 endpointsMap EndpointsMap
620 hostname string
621 }
622
623 func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier {
624 return &FakeProxier{
625 svcPortMap: make(ServicePortMap),
626 serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil),
627 endpointsMap: make(EndpointsMap),
628 endpointsChanges: &EndpointsChangeTracker{
629 lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
630 trackerStartTime: t,
631 processEndpointsMapChange: nil,
632 endpointSliceCache: NewEndpointSliceCache(testHostname, ipFamily, nil, nil),
633 },
634 }
635 }
636
637 func makeServiceMap(fake *FakeProxier, allServices ...*v1.Service) {
638 for i := range allServices {
639 fake.addService(allServices[i])
640 }
641 }
642
643 func (proxier *FakeProxier) addService(service *v1.Service) {
644 proxier.serviceChanges.Update(nil, service)
645 }
646
647 func (proxier *FakeProxier) updateService(oldService *v1.Service, service *v1.Service) {
648 proxier.serviceChanges.Update(oldService, service)
649 }
650
651 func (proxier *FakeProxier) deleteService(service *v1.Service) {
652 proxier.serviceChanges.Update(service, nil)
653 }
654
655 func TestServiceMapUpdateHeadless(t *testing.T) {
656 fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
657
658 makeServiceMap(fp,
659 makeTestService("ns2", "headless", func(svc *v1.Service) {
660 svc.Spec.Type = v1.ServiceTypeClusterIP
661 svc.Spec.ClusterIP = v1.ClusterIPNone
662 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
663 }),
664 makeTestService("ns2", "headless-without-port", func(svc *v1.Service) {
665 svc.Spec.Type = v1.ServiceTypeClusterIP
666 svc.Spec.ClusterIP = v1.ClusterIPNone
667 }),
668 )
669
670
671 result := fp.svcPortMap.Update(fp.serviceChanges)
672 if len(fp.svcPortMap) != 0 {
673 t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
674 }
675
676 if len(result.UpdatedServices) != 0 {
677 t.Errorf("expected 0 updated services, got %d", len(result.UpdatedServices))
678 }
679 if len(result.DeletedUDPClusterIPs) != 0 {
680 t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
681 }
682
683
684 healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
685 if len(healthCheckNodePorts) != 0 {
686 t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts))
687 }
688 }
689
690 func TestUpdateServiceTypeExternalName(t *testing.T) {
691 fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
692
693 makeServiceMap(fp,
694 makeTestService("ns2", "external-name", func(svc *v1.Service) {
695 svc.Spec.Type = v1.ServiceTypeExternalName
696 svc.Spec.ClusterIP = "172.16.55.4"
697 svc.Spec.ExternalName = "foo2.bar.com"
698 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
699 }),
700 )
701
702 result := fp.svcPortMap.Update(fp.serviceChanges)
703 if len(fp.svcPortMap) != 0 {
704 t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
705 }
706 if len(result.UpdatedServices) != 0 {
707 t.Errorf("expected 0 updated services, got %v", result.UpdatedServices)
708 }
709 if len(result.DeletedUDPClusterIPs) != 0 {
710 t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs)
711 }
712
713
714 healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
715 if len(healthCheckNodePorts) != 0 {
716 t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
717 }
718 }
719
720 func TestBuildServiceMapAddRemove(t *testing.T) {
721 fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
722
723 services := []*v1.Service{
724 makeTestService("ns2", "cluster-ip", func(svc *v1.Service) {
725 svc.Spec.Type = v1.ServiceTypeClusterIP
726 svc.Spec.ClusterIP = "172.16.55.4"
727 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "UDP", 1234, 4321, 0)
728 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "UDP", 1235, 5321, 0)
729 }),
730 makeTestService("ns2", "node-port", func(svc *v1.Service) {
731 svc.Spec.Type = v1.ServiceTypeNodePort
732 svc.Spec.ClusterIP = "172.16.55.10"
733 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "UDP", 345, 678, 0)
734 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 344, 677, 0)
735 }),
736 makeTestService("ns1", "load-balancer", func(svc *v1.Service) {
737 svc.Spec.Type = v1.ServiceTypeLoadBalancer
738 svc.Spec.ClusterIP = "172.16.55.11"
739 svc.Spec.LoadBalancerIP = "5.6.7.8"
740 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
741 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
742 svc.Status.LoadBalancer = v1.LoadBalancerStatus{
743 Ingress: []v1.LoadBalancerIngress{
744 {IP: "10.1.2.4"},
745 },
746 }
747 }),
748 makeTestService("ns1", "only-local-load-balancer", func(svc *v1.Service) {
749 svc.Spec.Type = v1.ServiceTypeLoadBalancer
750 svc.Spec.ClusterIP = "172.16.55.12"
751 svc.Spec.LoadBalancerIP = "5.6.7.8"
752 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
753 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
754 svc.Status.LoadBalancer = v1.LoadBalancerStatus{
755 Ingress: []v1.LoadBalancerIngress{
756 {IP: "10.1.2.3"},
757 },
758 }
759 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
760 svc.Spec.HealthCheckNodePort = 345
761 }),
762 }
763
764 for i := range services {
765 fp.addService(services[i])
766 }
767
768 result := fp.svcPortMap.Update(fp.serviceChanges)
769 if len(fp.svcPortMap) != 8 {
770 t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
771 }
772 for i := range services {
773 name := makeNSN(services[i].Namespace, services[i].Name)
774 if !result.UpdatedServices.Has(name) {
775 t.Errorf("expected updated service for %q", name)
776 }
777 }
778 if len(result.UpdatedServices) != len(services) {
779 t.Errorf("expected %d updated services, got %d", len(services), len(result.UpdatedServices))
780 }
781 if len(result.DeletedUDPClusterIPs) != 0 {
782
783 t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
784 }
785
786
787 healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
788 if len(healthCheckNodePorts) != 1 {
789 t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts)
790 } else {
791 nsn := makeNSN("ns1", "only-local-load-balancer")
792 if port, found := healthCheckNodePorts[nsn]; !found || port != 345 {
793 t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts)
794 }
795 }
796
797
798
799 oneService := makeTestService("ns2", "cluster-ip", func(svc *v1.Service) {
800 svc.Spec.Type = v1.ServiceTypeClusterIP
801 svc.Spec.ClusterIP = "172.16.55.4"
802 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "UDP", 1235, 5321, 0)
803 })
804
805 fp.updateService(services[0], oneService)
806 fp.deleteService(services[1])
807 fp.deleteService(services[2])
808 fp.deleteService(services[3])
809
810 result = fp.svcPortMap.Update(fp.serviceChanges)
811 if len(fp.svcPortMap) != 1 {
812 t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
813 }
814 if len(result.UpdatedServices) != 4 {
815 t.Errorf("expected 4 updated services, got %d", len(result.UpdatedServices))
816 }
817
818 healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
819 if len(healthCheckNodePorts) != 0 {
820 t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts)
821 }
822
823
824
825
826 expectedDeletedUDPClusterIPs := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
827 if len(result.DeletedUDPClusterIPs) != len(expectedDeletedUDPClusterIPs) {
828 t.Errorf("expected stale UDP services length %d, got %v", len(expectedDeletedUDPClusterIPs), result.DeletedUDPClusterIPs.UnsortedList())
829 }
830 for _, ip := range expectedDeletedUDPClusterIPs {
831 if !result.DeletedUDPClusterIPs.Has(ip) {
832 t.Errorf("expected stale UDP service service %s", ip)
833 }
834 }
835 }
836
837 func TestBuildServiceMapServiceUpdate(t *testing.T) {
838 fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
839
840 servicev1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
841 svc.Spec.Type = v1.ServiceTypeClusterIP
842 svc.Spec.ClusterIP = "172.16.55.4"
843 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p1", "UDP", 1234, 4321, 0)
844 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "TCP", 1235, 5321, 0)
845 })
846 servicev2 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
847 svc.Spec.Type = v1.ServiceTypeLoadBalancer
848 svc.Spec.ClusterIP = "172.16.55.4"
849 svc.Spec.LoadBalancerIP = "5.6.7.8"
850 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p1", "UDP", 1234, 4321, 7002)
851 svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "TCP", 1235, 5321, 7003)
852 svc.Status.LoadBalancer = v1.LoadBalancerStatus{
853 Ingress: []v1.LoadBalancerIngress{
854 {IP: "10.1.2.3"},
855 },
856 }
857 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
858 svc.Spec.HealthCheckNodePort = 345
859 })
860
861 fp.addService(servicev1)
862
863 result := fp.svcPortMap.Update(fp.serviceChanges)
864 if len(fp.svcPortMap) != 2 {
865 t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
866 }
867 if len(result.UpdatedServices) != 1 {
868 t.Errorf("expected 1 updated service, got %d", len(result.UpdatedServices))
869 }
870 if len(result.DeletedUDPClusterIPs) != 0 {
871
872 t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
873 }
874
875 healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
876 if len(healthCheckNodePorts) != 0 {
877 t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
878 }
879
880
881 fp.updateService(servicev1, servicev2)
882 result = fp.svcPortMap.Update(fp.serviceChanges)
883 if len(fp.svcPortMap) != 2 {
884 t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
885 }
886 if len(result.UpdatedServices) != 1 {
887 t.Errorf("expected 1 updated service, got %d", len(result.UpdatedServices))
888 }
889 if len(result.DeletedUDPClusterIPs) != 0 {
890 t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
891 }
892
893 healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
894 if len(healthCheckNodePorts) != 1 {
895 t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
896 }
897
898
899
900 fp.updateService(servicev2, servicev2)
901 result = fp.svcPortMap.Update(fp.serviceChanges)
902 if len(fp.svcPortMap) != 2 {
903 t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
904 }
905 if len(result.UpdatedServices) != 0 {
906 t.Errorf("expected 0 updated services, got %d", len(result.UpdatedServices))
907 }
908 if len(result.DeletedUDPClusterIPs) != 0 {
909 t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
910 }
911
912 healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
913 if len(healthCheckNodePorts) != 1 {
914 t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
915 }
916
917
918 fp.updateService(servicev2, servicev1)
919 result = fp.svcPortMap.Update(fp.serviceChanges)
920 if len(fp.svcPortMap) != 2 {
921 t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
922 }
923 if len(result.UpdatedServices) != 1 {
924 t.Errorf("expected 1 updated service, got %d", len(result.UpdatedServices))
925 }
926 if len(result.DeletedUDPClusterIPs) != 0 {
927
928 t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
929 }
930
931 healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
932 if len(healthCheckNodePorts) != 0 {
933 t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
934 }
935 }
936
View as plain text