1
2
3
4
19
20 package network
21
22 import (
23 "context"
24 "fmt"
25 "io"
26 "math/big"
27 "net"
28 "net/http"
29 "strconv"
30 "strings"
31 "sync"
32 "sync/atomic"
33 "time"
34
35 compute "google.golang.org/api/compute/v1"
36
37 appsv1 "k8s.io/api/apps/v1"
38 v1 "k8s.io/api/core/v1"
39 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
40 "k8s.io/apimachinery/pkg/types"
41 "k8s.io/apimachinery/pkg/util/intstr"
42 utilnet "k8s.io/apimachinery/pkg/util/net"
43 "k8s.io/apimachinery/pkg/util/sets"
44 "k8s.io/apimachinery/pkg/util/wait"
45 clientset "k8s.io/client-go/kubernetes"
46 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
47 e2eapps "k8s.io/kubernetes/test/e2e/apps"
48 "k8s.io/kubernetes/test/e2e/framework"
49 e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
50 e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
51 e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
52 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
53 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
54 e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
55 "k8s.io/kubernetes/test/e2e/framework/providers/gce"
56 e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
57 e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
58 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
59 "k8s.io/kubernetes/test/e2e/network/common"
60 admissionapi "k8s.io/pod-security-admission/api"
61 netutils "k8s.io/utils/net"
62 utilpointer "k8s.io/utils/pointer"
63
64 "github.com/onsi/ginkgo/v2"
65 "github.com/onsi/gomega"
66 )
67
68
69 func getInternalIP(node *v1.Node) (string, error) {
70 for _, address := range node.Status.Addresses {
71 if address.Type == v1.NodeInternalIP && address.Address != "" {
72 return address.Address, nil
73 }
74 }
75 return "", fmt.Errorf("couldn't get the internal IP of host %s with addresses %v", node.Name, node.Status.Addresses)
76 }
77
78
79
80
81
82 func getSubnetPrefix(ctx context.Context, c clientset.Interface) (*net.IPNet, error) {
83 node, err := getReadySchedulableWorkerNode(ctx, c)
84 if err != nil {
85 return nil, fmt.Errorf("error getting a ready schedulable worker Node, err: %w", err)
86 }
87 internalIP, err := getInternalIP(node)
88 if err != nil {
89 return nil, fmt.Errorf("error getting Node internal IP, err: %w", err)
90 }
91 ip := netutils.ParseIPSloppy(internalIP)
92 if ip == nil {
93 return nil, fmt.Errorf("invalid IP address format: %s", internalIP)
94 }
95
96
97 ciderMask := net.CIDRMask(64, 128)
98
99 if netutils.IsIPv4(ip) {
100 ciderMask = net.CIDRMask(16, 32)
101 }
102 return &net.IPNet{IP: ip.Mask(ciderMask), Mask: ciderMask}, nil
103 }
104
105
106
107 func getReadySchedulableWorkerNode(ctx context.Context, c clientset.Interface) (*v1.Node, error) {
108 nodes, err := e2enode.GetReadySchedulableNodes(ctx, c)
109 if err != nil {
110 return nil, err
111 }
112 for i := range nodes.Items {
113 node := nodes.Items[i]
114 _, isMaster := node.Labels["node-role.kubernetes.io/master"]
115 _, isControlPlane := node.Labels["node-role.kubernetes.io/control-plane"]
116 if !isMaster && !isControlPlane {
117 return &node, nil
118 }
119 }
120 return nil, fmt.Errorf("there are currently no ready, schedulable worker nodes in the cluster")
121 }
122
123 var _ = common.SIGDescribe("LoadBalancers", func() {
124 f := framework.NewDefaultFramework("loadbalancers")
125 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
126
127 var cs clientset.Interface
128 var subnetPrefix *net.IPNet
129 var err error
130
131 ginkgo.BeforeEach(func(ctx context.Context) {
132 cs = f.ClientSet
133 subnetPrefix, err = getSubnetPrefix(ctx, cs)
134 framework.ExpectNoError(err)
135 })
136
137 ginkgo.AfterEach(func(ctx context.Context) {
138 if ginkgo.CurrentSpecReport().Failed() {
139 DescribeSvc(f.Namespace.Name)
140 }
141 })
142
143 f.It("should be able to change the type and ports of a TCP service", f.WithSlow(), func(ctx context.Context) {
144
145 e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws")
146
147 loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault
148 if framework.ProviderIs("aws") {
149 loadBalancerLagTimeout = e2eservice.LoadBalancerLagTimeoutAWS
150 }
151 loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
152
153
154
155
156 serviceName := "mutability-test"
157 ns1 := f.Namespace.Name
158 framework.Logf("namespace for TCP test: %s", ns1)
159
160 ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns1)
161 tcpJig := e2eservice.NewTestJig(cs, ns1, serviceName)
162 tcpService, err := tcpJig.CreateTCPService(ctx, nil)
163 framework.ExpectNoError(err)
164
165 svcPort := int(tcpService.Spec.Ports[0].Port)
166 framework.Logf("service port TCP: %d", svcPort)
167
168 ginkgo.By("creating a pod to be part of the TCP service " + serviceName)
169 _, err = tcpJig.Run(ctx, nil)
170 framework.ExpectNoError(err)
171
172 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns1, "execpod", nil)
173 err = tcpJig.CheckServiceReachability(ctx, tcpService, execPod)
174 framework.ExpectNoError(err)
175
176
177
178 ginkgo.By("changing the TCP service to type=NodePort")
179 tcpService, err = tcpJig.UpdateService(ctx, func(s *v1.Service) {
180 s.Spec.Type = v1.ServiceTypeNodePort
181 })
182 framework.ExpectNoError(err)
183 tcpNodePort := int(tcpService.Spec.Ports[0].NodePort)
184 framework.Logf("TCP node port: %d", tcpNodePort)
185
186 err = tcpJig.CheckServiceReachability(ctx, tcpService, execPod)
187 framework.ExpectNoError(err)
188
189
190
191
192
193 requestedIP := ""
194 staticIPName := ""
195 if framework.ProviderIs("gce", "gke") {
196 ginkgo.By("creating a static load balancer IP")
197 staticIPName = fmt.Sprintf("e2e-external-lb-test-%s", framework.RunID)
198 gceCloud, err := gce.GetGCECloud()
199 framework.ExpectNoError(err, "failed to get GCE cloud provider")
200
201 err = gceCloud.ReserveRegionAddress(&compute.Address{Name: staticIPName}, gceCloud.Region())
202 defer func() {
203 if staticIPName != "" {
204
205 if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
206 framework.Logf("failed to release static IP %s: %v", staticIPName, err)
207 }
208 }
209 }()
210 framework.ExpectNoError(err, "failed to create region address: %s", staticIPName)
211 reservedAddr, err := gceCloud.GetRegionAddress(staticIPName, gceCloud.Region())
212 framework.ExpectNoError(err, "failed to get region address: %s", staticIPName)
213
214 requestedIP = reservedAddr.Address
215 framework.Logf("Allocated static load balancer IP: %s", requestedIP)
216 }
217
218 ginkgo.By("changing the TCP service to type=LoadBalancer")
219 _, err = tcpJig.UpdateService(ctx, func(s *v1.Service) {
220 s.Spec.LoadBalancerIP = requestedIP
221 s.Spec.Type = v1.ServiceTypeLoadBalancer
222 })
223 framework.ExpectNoError(err)
224
225 ginkgo.By("waiting for the TCP service to have a load balancer")
226
227 tcpService, err = tcpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
228 framework.ExpectNoError(err)
229 if int(tcpService.Spec.Ports[0].NodePort) != tcpNodePort {
230 framework.Failf("TCP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", tcpNodePort, tcpService.Spec.Ports[0].NodePort)
231 }
232 if requestedIP != "" && e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != requestedIP {
233 framework.Failf("unexpected TCP Status.LoadBalancer.Ingress (expected %s, got %s)", requestedIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
234 }
235 tcpIngressIP := e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])
236 framework.Logf("TCP load balancer: %s", tcpIngressIP)
237
238 if framework.ProviderIs("gce", "gke") {
239
240
241
242
243 ginkgo.By("demoting the static IP to ephemeral")
244 if staticIPName != "" {
245 gceCloud, err := gce.GetGCECloud()
246 framework.ExpectNoError(err, "failed to get GCE cloud provider")
247
248
249 if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
250 framework.Failf("failed to release static IP %s: %v", staticIPName, err)
251 }
252 staticIPName = ""
253 }
254 }
255
256 err = tcpJig.CheckServiceReachability(ctx, tcpService, execPod)
257 framework.ExpectNoError(err)
258
259 ginkgo.By("hitting the TCP service's LoadBalancer")
260 e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout)
261
262
263
264 ginkgo.By("changing the TCP service's NodePort")
265 tcpService, err = tcpJig.ChangeServiceNodePort(ctx, tcpNodePort)
266 framework.ExpectNoError(err)
267 tcpNodePortOld := tcpNodePort
268 tcpNodePort = int(tcpService.Spec.Ports[0].NodePort)
269 if tcpNodePort == tcpNodePortOld {
270 framework.Failf("TCP Spec.Ports[0].NodePort (%d) did not change", tcpNodePort)
271 }
272 if e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
273 framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
274 }
275 framework.Logf("TCP node port: %d", tcpNodePort)
276
277 ginkgo.By("hitting the TCP service's LoadBalancer")
278 e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout)
279
280
281
282 ginkgo.By("changing the TCP service's port")
283 tcpService, err = tcpJig.UpdateService(ctx, func(s *v1.Service) {
284 s.Spec.Ports[0].Port++
285 })
286 framework.ExpectNoError(err)
287 svcPortOld := svcPort
288 svcPort = int(tcpService.Spec.Ports[0].Port)
289 if svcPort == svcPortOld {
290 framework.Failf("TCP Spec.Ports[0].Port (%d) did not change", svcPort)
291 }
292 if int(tcpService.Spec.Ports[0].NodePort) != tcpNodePort {
293 framework.Failf("TCP Spec.Ports[0].NodePort (%d) changed", tcpService.Spec.Ports[0].NodePort)
294 }
295 if e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
296 framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
297 }
298
299 framework.Logf("service port TCP: %d", svcPort)
300
301 ginkgo.By("hitting the TCP service's LoadBalancer")
302 e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerCreateTimeout)
303
304 ginkgo.By("Scaling the pods to 0")
305 err = tcpJig.Scale(ctx, 0)
306 framework.ExpectNoError(err)
307
308 ginkgo.By("looking for ICMP REJECT on the TCP service's LoadBalancer")
309 testRejectedHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout)
310
311 ginkgo.By("Scaling the pods to 1")
312 err = tcpJig.Scale(ctx, 1)
313 framework.ExpectNoError(err)
314
315 ginkgo.By("hitting the TCP service's LoadBalancer")
316 e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerCreateTimeout)
317
318
319
320 ginkgo.By("changing TCP service back to type=ClusterIP")
321 tcpReadback, err := tcpJig.UpdateService(ctx, func(s *v1.Service) {
322 s.Spec.Type = v1.ServiceTypeClusterIP
323 })
324 framework.ExpectNoError(err)
325 if tcpReadback.Spec.Ports[0].NodePort != 0 {
326 framework.Fail("TCP Spec.Ports[0].NodePort was not cleared")
327 }
328
329 _, err = tcpJig.WaitForLoadBalancerDestroy(ctx, tcpIngressIP, svcPort, loadBalancerCreateTimeout)
330 framework.ExpectNoError(err)
331
332 ginkgo.By("checking the TCP LoadBalancer is closed")
333 testNotReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout)
334 })
335
336 f.It("should be able to change the type and ports of a UDP service", f.WithSlow(), func(ctx context.Context) {
337
338 e2eskipper.SkipUnlessProviderIs("gce", "gke")
339
340 loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault
341 loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
342
343
344
345
346 serviceName := "mutability-test"
347 ns2 := f.Namespace.Name
348 framework.Logf("namespace for TCP test: %s", ns2)
349
350 ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in namespace " + ns2)
351 udpJig := e2eservice.NewTestJig(cs, ns2, serviceName)
352 udpService, err := udpJig.CreateUDPService(ctx, nil)
353 framework.ExpectNoError(err)
354
355 svcPort := int(udpService.Spec.Ports[0].Port)
356 framework.Logf("service port UDP: %d", svcPort)
357
358 ginkgo.By("creating a pod to be part of the UDP service " + serviceName)
359 _, err = udpJig.Run(ctx, nil)
360 framework.ExpectNoError(err)
361
362 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns2, "execpod", nil)
363 err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
364 framework.ExpectNoError(err)
365
366
367
368 ginkgo.By("changing the UDP service to type=NodePort")
369 udpService, err = udpJig.UpdateService(ctx, func(s *v1.Service) {
370 s.Spec.Type = v1.ServiceTypeNodePort
371 })
372 framework.ExpectNoError(err)
373 udpNodePort := int(udpService.Spec.Ports[0].NodePort)
374 framework.Logf("UDP node port: %d", udpNodePort)
375
376 err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
377 framework.ExpectNoError(err)
378
379
380
381
382
383 requestedIP := ""
384 staticIPName := ""
385 ginkgo.By("creating a static load balancer IP")
386 staticIPName = fmt.Sprintf("e2e-external-lb-test-%s", framework.RunID)
387 gceCloud, err := gce.GetGCECloud()
388 framework.ExpectNoError(err, "failed to get GCE cloud provider")
389
390 err = gceCloud.ReserveRegionAddress(&compute.Address{Name: staticIPName}, gceCloud.Region())
391 defer func() {
392 if staticIPName != "" {
393
394 if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
395 framework.Logf("failed to release static IP %s: %v", staticIPName, err)
396 }
397 }
398 }()
399 framework.ExpectNoError(err, "failed to create region address: %s", staticIPName)
400 reservedAddr, err := gceCloud.GetRegionAddress(staticIPName, gceCloud.Region())
401 framework.ExpectNoError(err, "failed to get region address: %s", staticIPName)
402
403 requestedIP = reservedAddr.Address
404 framework.Logf("Allocated static load balancer IP: %s", requestedIP)
405
406 ginkgo.By("changing the UDP service to type=LoadBalancer")
407 _, err = udpJig.UpdateService(ctx, func(s *v1.Service) {
408 s.Spec.Type = v1.ServiceTypeLoadBalancer
409 })
410 framework.ExpectNoError(err)
411
412
413
414
415
416 ginkgo.By("demoting the static IP to ephemeral")
417 if staticIPName != "" {
418 gceCloud, err := gce.GetGCECloud()
419 framework.ExpectNoError(err, "failed to get GCE cloud provider")
420
421
422 if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
423 framework.Failf("failed to release static IP %s: %v", staticIPName, err)
424 }
425 staticIPName = ""
426 }
427
428 var udpIngressIP string
429 ginkgo.By("waiting for the UDP service to have a load balancer")
430
431 udpService, err = udpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
432 framework.ExpectNoError(err)
433 if int(udpService.Spec.Ports[0].NodePort) != udpNodePort {
434 framework.Failf("UDP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", udpNodePort, udpService.Spec.Ports[0].NodePort)
435 }
436 udpIngressIP = e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])
437 framework.Logf("UDP load balancer: %s", udpIngressIP)
438
439 err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
440 framework.ExpectNoError(err)
441
442 ginkgo.By("hitting the UDP service's LoadBalancer")
443 testReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
444
445
446
447 ginkgo.By("changing the UDP service's NodePort")
448 udpService, err = udpJig.ChangeServiceNodePort(ctx, udpNodePort)
449 framework.ExpectNoError(err)
450 udpNodePortOld := udpNodePort
451 udpNodePort = int(udpService.Spec.Ports[0].NodePort)
452 if udpNodePort == udpNodePortOld {
453 framework.Failf("UDP Spec.Ports[0].NodePort (%d) did not change", udpNodePort)
454 }
455 if e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP {
456 framework.Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]))
457 }
458 framework.Logf("UDP node port: %d", udpNodePort)
459
460 err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
461 framework.ExpectNoError(err)
462
463 ginkgo.By("hitting the UDP service's LoadBalancer")
464 testReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
465
466
467
468 ginkgo.By("changing the UDP service's port")
469 udpService, err = udpJig.UpdateService(ctx, func(s *v1.Service) {
470 s.Spec.Ports[0].Port++
471 })
472 framework.ExpectNoError(err)
473 svcPortOld := svcPort
474 svcPort = int(udpService.Spec.Ports[0].Port)
475 if svcPort == svcPortOld {
476 framework.Failf("UDP Spec.Ports[0].Port (%d) did not change", svcPort)
477 }
478 if int(udpService.Spec.Ports[0].NodePort) != udpNodePort {
479 framework.Failf("UDP Spec.Ports[0].NodePort (%d) changed", udpService.Spec.Ports[0].NodePort)
480 }
481 if e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP {
482 framework.Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]))
483 }
484
485 framework.Logf("service port UDP: %d", svcPort)
486
487 ginkgo.By("hitting the UDP service's NodePort")
488 err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
489 framework.ExpectNoError(err)
490
491 ginkgo.By("hitting the UDP service's LoadBalancer")
492 testReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
493
494 ginkgo.By("Scaling the pods to 0")
495 err = udpJig.Scale(ctx, 0)
496 framework.ExpectNoError(err)
497
498 ginkgo.By("looking for ICMP REJECT on the UDP service's LoadBalancer")
499 testRejectedUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
500
501 ginkgo.By("Scaling the pods to 1")
502 err = udpJig.Scale(ctx, 1)
503 framework.ExpectNoError(err)
504
505 ginkgo.By("hitting the UDP service's NodePort")
506 err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
507 framework.ExpectNoError(err)
508
509 ginkgo.By("hitting the UDP service's LoadBalancer")
510 testReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
511
512
513
514 ginkgo.By("changing UDP service back to type=ClusterIP")
515 udpReadback, err := udpJig.UpdateService(ctx, func(s *v1.Service) {
516 s.Spec.Type = v1.ServiceTypeClusterIP
517 })
518 framework.ExpectNoError(err)
519 if udpReadback.Spec.Ports[0].NodePort != 0 {
520 framework.Fail("UDP Spec.Ports[0].NodePort was not cleared")
521 }
522
523 _, err = udpJig.WaitForLoadBalancerDestroy(ctx, udpIngressIP, svcPort, loadBalancerCreateTimeout)
524 framework.ExpectNoError(err)
525
526 ginkgo.By("checking the UDP LoadBalancer is closed")
527 testNotReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
528 })
529
530 f.It("should only allow access from service loadbalancer source ranges", f.WithSlow(), func(ctx context.Context) {
531
532 e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws", "azure")
533
534 loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
535
536 namespace := f.Namespace.Name
537 serviceName := "lb-sourcerange"
538 jig := e2eservice.NewTestJig(cs, namespace, serviceName)
539
540 ginkgo.By("Prepare allow source ips")
541
542
543 acceptPod := e2epod.CreateExecPodOrFail(ctx, cs, namespace, "execpod-accept", nil)
544 dropPod := e2epod.CreateExecPodOrFail(ctx, cs, namespace, "execpod-drop", nil)
545
546 ginkgo.By("creating a pod to be part of the service " + serviceName)
547
548
549 _, err := jig.Run(ctx, nil)
550 framework.ExpectNoError(err)
551
552 acceptPod, err = cs.CoreV1().Pods(namespace).Get(ctx, acceptPod.Name, metav1.GetOptions{})
553 framework.ExpectNoError(err, "Unable to get pod %s", acceptPod.Name)
554 gomega.Expect(acceptPod.Status.Phase).To(gomega.Equal(v1.PodRunning))
555 gomega.Expect(acceptPod.Status.PodIP).ToNot(gomega.BeEmpty())
556
557
558 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
559 svc.Spec.Type = v1.ServiceTypeLoadBalancer
560 svc.Spec.LoadBalancerSourceRanges = []string{acceptPod.Status.PodIP + "/32"}
561 })
562 framework.ExpectNoError(err)
563
564 ginkgo.DeferCleanup(func(ctx context.Context) {
565 ginkgo.By("Clean up loadbalancer service")
566 e2eservice.WaitForServiceDeletedWithFinalizer(ctx, cs, svc.Namespace, svc.Name)
567 })
568
569 svc, err = jig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
570 framework.ExpectNoError(err)
571
572 ginkgo.By("check reachability from different sources")
573 svcIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
574
575
576
577
578 checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, acceptPod.Name, svcIP)
579 checkReachabilityFromPod(false, loadBalancerCreateTimeout, namespace, dropPod.Name, svcIP)
580
581
582 dropPod, err = cs.CoreV1().Pods(namespace).Get(ctx, dropPod.Name, metav1.GetOptions{})
583 framework.ExpectNoError(err, "Unable to get pod %s", dropPod.Name)
584 gomega.Expect(acceptPod.Status.Phase).To(gomega.Equal(v1.PodRunning))
585 gomega.Expect(acceptPod.Status.PodIP).ToNot(gomega.BeEmpty())
586
587 ginkgo.By("Update service LoadBalancerSourceRange and check reachability")
588 _, err = jig.UpdateService(ctx, func(svc *v1.Service) {
589
590 svc.Spec.LoadBalancerSourceRanges = []string{dropPod.Status.PodIP + "/32"}
591 })
592 framework.ExpectNoError(err)
593
594
595
596
597
598 checkReachabilityFromPod(false, loadBalancerCreateTimeout, namespace, acceptPod.Name, svcIP)
599 checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, dropPod.Name, svcIP)
600
601 ginkgo.By("Delete LoadBalancerSourceRange field and check reachability")
602 _, err = jig.UpdateService(ctx, func(svc *v1.Service) {
603 svc.Spec.LoadBalancerSourceRanges = nil
604 })
605 framework.ExpectNoError(err)
606
607
608
609
610 checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, acceptPod.Name, svcIP)
611 checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, dropPod.Name, svcIP)
612 })
613
614 f.It("should be able to create an internal type load balancer", f.WithSlow(), func(ctx context.Context) {
615 e2eskipper.SkipUnlessProviderIs("gke", "gce")
616
617 createTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
618 pollInterval := framework.Poll * 10
619
620 namespace := f.Namespace.Name
621 serviceName := "lb-internal"
622 jig := e2eservice.NewTestJig(cs, namespace, serviceName)
623
624 ginkgo.By("creating pod to be part of service " + serviceName)
625 _, err := jig.Run(ctx, nil)
626 framework.ExpectNoError(err)
627
628 enableILB, disableILB := enableAndDisableInternalLB()
629
630 isInternalEndpoint := func(lbIngress *v1.LoadBalancerIngress) bool {
631 ingressEndpoint := e2eservice.GetIngressPoint(lbIngress)
632 ingressIP := netutils.ParseIPSloppy(ingressEndpoint)
633 if ingressIP == nil {
634 framework.Failf("invalid ingressEndpoint IP address format: %s", ingressEndpoint)
635 }
636
637 return subnetPrefix.Contains(ingressIP)
638 }
639
640 ginkgo.By("creating a service with type LoadBalancer and cloud specific Internal-LB annotation enabled")
641 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
642 svc.Spec.Type = v1.ServiceTypeLoadBalancer
643 enableILB(svc)
644 })
645 framework.ExpectNoError(err)
646
647 ginkgo.DeferCleanup(func(ctx context.Context) {
648 ginkgo.By("Clean up loadbalancer service")
649 e2eservice.WaitForServiceDeletedWithFinalizer(ctx, cs, svc.Namespace, svc.Name)
650 })
651
652 svc, err = jig.WaitForLoadBalancer(ctx, createTimeout)
653 framework.ExpectNoError(err)
654 lbIngress := &svc.Status.LoadBalancer.Ingress[0]
655 svcPort := int(svc.Spec.Ports[0].Port)
656
657 if !isInternalEndpoint(lbIngress) {
658 framework.Failf("lbIngress %v doesn't have an internal IP", lbIngress)
659 }
660
661
662
663 ginkgo.By("hitting the internal load balancer from pod")
664 framework.Logf("creating pod with host network")
665 hostExec := launchHostExecPod(ctx, f.ClientSet, f.Namespace.Name, "ilb-host-exec")
666
667 framework.Logf("Waiting up to %v for service %q's internal LB to respond to requests", createTimeout, serviceName)
668 tcpIngressIP := e2eservice.GetIngressPoint(lbIngress)
669 if pollErr := wait.PollImmediate(pollInterval, createTimeout, func() (bool, error) {
670 cmd := fmt.Sprintf(`curl -m 5 'http://%v:%v/echo?msg=hello'`, tcpIngressIP, svcPort)
671 stdout, err := e2eoutput.RunHostCmd(hostExec.Namespace, hostExec.Name, cmd)
672 if err != nil {
673 framework.Logf("error curling; stdout: %v. err: %v", stdout, err)
674 return false, nil
675 }
676
677 if !strings.Contains(stdout, "hello") {
678 framework.Logf("Expected output to contain 'hello', got %q; retrying...", stdout)
679 return false, nil
680 }
681
682 framework.Logf("Successful curl; stdout: %v", stdout)
683 return true, nil
684 }); pollErr != nil {
685 framework.Failf("ginkgo.Failed to hit ILB IP, err: %v", pollErr)
686 }
687
688 ginkgo.By("switching to external type LoadBalancer")
689 svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
690 disableILB(svc)
691 })
692 framework.ExpectNoError(err)
693 framework.Logf("Waiting up to %v for service %q to have an external LoadBalancer", createTimeout, serviceName)
694 if pollErr := wait.PollImmediate(pollInterval, createTimeout, func() (bool, error) {
695 svc, err := cs.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{})
696 if err != nil {
697 return false, err
698 }
699 lbIngress = &svc.Status.LoadBalancer.Ingress[0]
700 return !isInternalEndpoint(lbIngress), nil
701 }); pollErr != nil {
702 framework.Failf("Loadbalancer IP not changed to external.")
703 }
704
705 gomega.Expect(isInternalEndpoint(lbIngress)).To(gomega.BeFalse())
706
707 ginkgo.By("hitting the external load balancer")
708 framework.Logf("Waiting up to %v for service %q's external LB to respond to requests", createTimeout, serviceName)
709 tcpIngressIP = e2eservice.GetIngressPoint(lbIngress)
710 e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, e2eservice.LoadBalancerLagTimeoutDefault)
711
712
713
714 if framework.ProviderIs("azure") {
715 ginkgo.By("switching back to interal type LoadBalancer, with static IP specified.")
716
717 base := netutils.BigForIP(subnetPrefix.IP)
718 offset := big.NewInt(0).SetBytes(netutils.ParseIPSloppy("0.0.11.11").To4()).Int64()
719
720 internalStaticIP := netutils.AddIPOffset(base, int(offset)).String()
721
722 svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
723 svc.Spec.LoadBalancerIP = internalStaticIP
724 enableILB(svc)
725 })
726 framework.ExpectNoError(err)
727 framework.Logf("Waiting up to %v for service %q to have an internal LoadBalancer", createTimeout, serviceName)
728 if pollErr := wait.PollImmediate(pollInterval, createTimeout, func() (bool, error) {
729 svc, err := cs.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{})
730 if err != nil {
731 return false, err
732 }
733 lbIngress = &svc.Status.LoadBalancer.Ingress[0]
734 return isInternalEndpoint(lbIngress), nil
735 }); pollErr != nil {
736 framework.Failf("Loadbalancer IP not changed to internal.")
737 }
738
739 gomega.Expect(e2eservice.GetIngressPoint(lbIngress)).To(gomega.Equal(internalStaticIP))
740 }
741 })
742
743
744 f.It("should have session affinity work for LoadBalancer service with ESIPP on", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) {
745
746 e2eskipper.SkipIfProviderIs("aws")
747
748 svc := getServeHostnameService("affinity-lb-esipp")
749 svc.Spec.Type = v1.ServiceTypeLoadBalancer
750 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
751 execAffinityTestForLBService(ctx, f, cs, svc)
752 })
753
754
755 f.It("should be able to switch session affinity for LoadBalancer service with ESIPP on", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) {
756
757 e2eskipper.SkipIfProviderIs("aws")
758
759 svc := getServeHostnameService("affinity-lb-esipp-transition")
760 svc.Spec.Type = v1.ServiceTypeLoadBalancer
761 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
762 execAffinityTestForLBServiceWithTransition(ctx, f, cs, svc)
763 })
764
765
766 f.It("should have session affinity work for LoadBalancer service with ESIPP off", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) {
767
768 e2eskipper.SkipIfProviderIs("aws")
769
770 svc := getServeHostnameService("affinity-lb")
771 svc.Spec.Type = v1.ServiceTypeLoadBalancer
772 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
773 execAffinityTestForLBService(ctx, f, cs, svc)
774 })
775
776
777 f.It("should be able to switch session affinity for LoadBalancer service with ESIPP off", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) {
778
779 e2eskipper.SkipIfProviderIs("aws")
780
781 svc := getServeHostnameService("affinity-lb-transition")
782 svc.Spec.Type = v1.ServiceTypeLoadBalancer
783 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
784 execAffinityTestForLBServiceWithTransition(ctx, f, cs, svc)
785 })
786
787
788
789
790
791
792
793 f.It("should handle load balancer cleanup finalizer for service", f.WithSlow(), func(ctx context.Context) {
794 jig := e2eservice.NewTestJig(cs, f.Namespace.Name, "lb-finalizer")
795
796 ginkgo.By("Create load balancer service")
797 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
798 svc.Spec.Type = v1.ServiceTypeLoadBalancer
799 })
800 framework.ExpectNoError(err)
801
802 ginkgo.DeferCleanup(func(ctx context.Context) {
803 ginkgo.By("Check that service can be deleted with finalizer")
804 e2eservice.WaitForServiceDeletedWithFinalizer(ctx, cs, svc.Namespace, svc.Name)
805 })
806
807 ginkgo.By("Wait for load balancer to serve traffic")
808 svc, err = jig.WaitForLoadBalancer(ctx, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs))
809 framework.ExpectNoError(err)
810
811 ginkgo.By("Check if finalizer presents on service with type=LoadBalancer")
812 e2eservice.WaitForServiceUpdatedWithFinalizer(ctx, cs, svc.Namespace, svc.Name, true)
813
814 ginkgo.By("Check if finalizer is removed on service after changed to type=ClusterIP")
815 err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs))
816 framework.ExpectNoError(err)
817 e2eservice.WaitForServiceUpdatedWithFinalizer(ctx, cs, svc.Namespace, svc.Name, false)
818
819 ginkgo.By("Check if finalizer is added back to service after changed to type=LoadBalancer")
820 err = jig.ChangeServiceType(ctx, v1.ServiceTypeLoadBalancer, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs))
821 framework.ExpectNoError(err)
822 e2eservice.WaitForServiceUpdatedWithFinalizer(ctx, cs, svc.Namespace, svc.Name, true)
823 })
824
825 f.It("should be able to create LoadBalancer Service without NodePort and change it", f.WithSlow(), func(ctx context.Context) {
826
827 e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws")
828
829 loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault
830 if framework.ProviderIs("aws") {
831 loadBalancerLagTimeout = e2eservice.LoadBalancerLagTimeoutAWS
832 }
833 loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
834
835
836
837
838 serviceName := "reallocate-nodeport-test"
839 ns1 := f.Namespace.Name
840 framework.Logf("namespace for TCP test: %s", ns1)
841
842 ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns1)
843 tcpJig := e2eservice.NewTestJig(cs, ns1, serviceName)
844 tcpService, err := tcpJig.CreateTCPService(ctx, nil)
845 framework.ExpectNoError(err)
846
847 svcPort := int(tcpService.Spec.Ports[0].Port)
848 framework.Logf("service port TCP: %d", svcPort)
849
850 ginkgo.By("creating a pod to be part of the TCP service " + serviceName)
851 _, err = tcpJig.Run(ctx, nil)
852 framework.ExpectNoError(err)
853
854
855
856
857
858 requestedIP := ""
859 staticIPName := ""
860 if framework.ProviderIs("gce", "gke") {
861 ginkgo.By("creating a static load balancer IP")
862 staticIPName = fmt.Sprintf("e2e-external-lb-test-%s", framework.RunID)
863 gceCloud, err := gce.GetGCECloud()
864 framework.ExpectNoError(err, "failed to get GCE cloud provider")
865
866 err = gceCloud.ReserveRegionAddress(&compute.Address{Name: staticIPName}, gceCloud.Region())
867 ginkgo.DeferCleanup(func(ctx context.Context) {
868 if staticIPName != "" {
869
870 if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
871 framework.Logf("failed to release static IP %s: %v", staticIPName, err)
872 }
873 }
874 })
875 framework.ExpectNoError(err, "failed to create region address: %s", staticIPName)
876 reservedAddr, err := gceCloud.GetRegionAddress(staticIPName, gceCloud.Region())
877 framework.ExpectNoError(err, "failed to get region address: %s", staticIPName)
878
879 requestedIP = reservedAddr.Address
880 framework.Logf("Allocated static load balancer IP: %s", requestedIP)
881 }
882
883 ginkgo.By("changing the TCP service to type=LoadBalancer")
884 _, err = tcpJig.UpdateService(ctx, func(s *v1.Service) {
885 s.Spec.LoadBalancerIP = requestedIP
886 s.Spec.Type = v1.ServiceTypeLoadBalancer
887 s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(false)
888 })
889 framework.ExpectNoError(err)
890
891 ginkgo.By("waiting for the TCP service to have a load balancer")
892
893 tcpService, err = tcpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
894 framework.ExpectNoError(err)
895 if int(tcpService.Spec.Ports[0].NodePort) != 0 {
896 framework.Failf("TCP Spec.Ports[0].NodePort allocated %d when not expected", tcpService.Spec.Ports[0].NodePort)
897 }
898 if requestedIP != "" && e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != requestedIP {
899 framework.Failf("unexpected TCP Status.LoadBalancer.Ingress (expected %s, got %s)", requestedIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
900 }
901 tcpIngressIP := e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])
902 framework.Logf("TCP load balancer: %s", tcpIngressIP)
903
904 if framework.ProviderIs("gce", "gke") {
905
906
907
908
909 ginkgo.By("demoting the static IP to ephemeral")
910 if staticIPName != "" {
911 gceCloud, err := gce.GetGCECloud()
912 framework.ExpectNoError(err, "failed to get GCE cloud provider")
913
914
915 if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
916 framework.Failf("failed to release static IP %s: %v", staticIPName, err)
917 }
918 staticIPName = ""
919 }
920 }
921
922 ginkgo.By("hitting the TCP service's LoadBalancer")
923 e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout)
924
925
926
927 ginkgo.By("adding a TCP service's NodePort")
928 tcpService, err = tcpJig.UpdateService(ctx, func(s *v1.Service) {
929 s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
930 })
931 framework.ExpectNoError(err)
932 tcpNodePort := int(tcpService.Spec.Ports[0].NodePort)
933 if tcpNodePort == 0 {
934 framework.Failf("TCP Spec.Ports[0].NodePort (%d) not allocated", tcpNodePort)
935 }
936 if e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
937 framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
938 }
939 framework.Logf("TCP node port: %d", tcpNodePort)
940
941 ginkgo.By("hitting the TCP service's LoadBalancer")
942 e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout)
943 })
944
945 ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a LoadBalancer service on different nodes", func(ctx context.Context) {
946
947 e2eskipper.SkipUnlessProviderIs("gce", "gke", "azure")
948 ns := f.Namespace.Name
949 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
950 framework.ExpectNoError(err)
951 if len(nodes.Items) < 2 {
952 e2eskipper.Skipf(
953 "Test requires >= 2 Ready nodes, but there are only %v nodes",
954 len(nodes.Items))
955 }
956
957 loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault
958 loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
959
960
961 udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
962 ginkgo.By("creating a UDP service " + serviceName + " with type=LoadBalancer in " + ns)
963 _, err = udpJig.CreateUDPService(ctx, func(svc *v1.Service) {
964 svc.Spec.Type = v1.ServiceTypeLoadBalancer
965 svc.Spec.Ports = []v1.ServicePort{
966 {Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(80)},
967 }
968 })
969 framework.ExpectNoError(err)
970
971 var udpIngressIP string
972 ginkgo.By("waiting for the UDP service to have a load balancer")
973 udpService, err := udpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
974 framework.ExpectNoError(err)
975
976 udpIngressIP = e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])
977 framework.Logf("UDP load balancer: %s", udpIngressIP)
978
979
980 ginkgo.By("hitting the UDP service's LoadBalancer with same source port")
981 stopCh := make(chan struct{})
982 defer close(stopCh)
983 var mu sync.Mutex
984 hostnames := sets.NewString()
985 go func() {
986 defer ginkgo.GinkgoRecover()
987 port := int(udpService.Spec.Ports[0].Port)
988 laddr, err := net.ResolveUDPAddr("udp", ":54321")
989 if err != nil {
990 framework.Failf("Failed to resolve local address: %v", err)
991 }
992 raddr := net.UDPAddr{IP: netutils.ParseIPSloppy(udpIngressIP), Port: port}
993
994 for {
995 select {
996 case <-stopCh:
997 if len(hostnames) != 2 {
998 framework.Failf("Failed to hit the 2 UDP LoadBalancer backends successfully, got %v", hostnames.List())
999 }
1000 return
1001 default:
1002 time.Sleep(1 * time.Second)
1003 }
1004
1005 conn, err := net.DialUDP("udp", laddr, &raddr)
1006 if err != nil {
1007 framework.Logf("Failed to connect to: %s %d", udpIngressIP, port)
1008 continue
1009 }
1010 conn.SetDeadline(time.Now().Add(3 * time.Second))
1011 framework.Logf("Connected successfully to: %s", raddr.String())
1012 conn.Write([]byte("hostname\n"))
1013 buff := make([]byte, 1024)
1014 n, _, err := conn.ReadFrom(buff)
1015 if err == nil {
1016 mu.Lock()
1017 hostnames.Insert(string(buff[:n]))
1018 mu.Unlock()
1019 framework.Logf("Connected successfully to hostname: %s", string(buff[:n]))
1020 }
1021 conn.Close()
1022 }
1023 }()
1024
1025
1026 ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
1027 serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
1028 serverPod1.Labels = udpJig.Labels
1029 serverPod1.Spec.Hostname = "hostname1"
1030 nodeSelection := e2epod.NodeSelection{Name: nodes.Items[0].Name}
1031 e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection)
1032 e2epod.NewPodClient(f).CreateSync(ctx, serverPod1)
1033
1034 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend1: {80}})
1035
1036
1037
1038
1039
1040
1041 ginkgo.By("checking client pod connected to the backend 1 on Node " + nodes.Items[0].Name)
1042 if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) {
1043 mu.Lock()
1044 defer mu.Unlock()
1045 return hostnames.Has(serverPod1.Spec.Hostname), nil
1046 }); err != nil {
1047 framework.Failf("Failed to connect to backend 1")
1048 }
1049
1050
1051 ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName)
1052 serverPod2 := e2epod.NewAgnhostPod(ns, podBackend2, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
1053 serverPod2.Labels = udpJig.Labels
1054 serverPod2.Spec.Hostname = "hostname2"
1055 nodeSelection = e2epod.NodeSelection{Name: nodes.Items[1].Name}
1056 e2epod.SetNodeSelection(&serverPod2.Spec, nodeSelection)
1057 e2epod.NewPodClient(f).CreateSync(ctx, serverPod2)
1058
1059
1060 framework.Logf("Cleaning up %s pod", podBackend1)
1061 e2epod.NewPodClient(f).DeleteSync(ctx, podBackend1, metav1.DeleteOptions{}, e2epod.DefaultPodDeletionTimeout)
1062
1063 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend2: {80}})
1064
1065
1066
1067 ginkgo.By("checking client pod connected to the backend 2 on Node " + nodes.Items[1].Name)
1068 if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) {
1069 mu.Lock()
1070 defer mu.Unlock()
1071 return hostnames.Has(serverPod2.Spec.Hostname), nil
1072 }); err != nil {
1073 framework.Failf("Failed to connect to backend 2")
1074 }
1075 })
1076
1077 ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a LoadBalancer service on the same nodes", func(ctx context.Context) {
1078
1079 e2eskipper.SkipUnlessProviderIs("gce", "gke", "azure")
1080 ns := f.Namespace.Name
1081 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 1)
1082 framework.ExpectNoError(err)
1083 if len(nodes.Items) < 1 {
1084 e2eskipper.Skipf(
1085 "Test requires >= 1 Ready nodes, but there are only %d nodes",
1086 len(nodes.Items))
1087 }
1088
1089 loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault
1090 loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
1091
1092
1093 udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
1094 ginkgo.By("creating a UDP service " + serviceName + " with type=LoadBalancer in " + ns)
1095 _, err = udpJig.CreateUDPService(ctx, func(svc *v1.Service) {
1096 svc.Spec.Type = v1.ServiceTypeLoadBalancer
1097 svc.Spec.Ports = []v1.ServicePort{
1098 {Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(80)},
1099 }
1100 })
1101 framework.ExpectNoError(err)
1102
1103 var udpIngressIP string
1104 ginkgo.By("waiting for the UDP service to have a load balancer")
1105 udpService, err := udpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
1106 framework.ExpectNoError(err)
1107
1108 udpIngressIP = e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])
1109 framework.Logf("UDP load balancer: %s", udpIngressIP)
1110
1111
1112 ginkgo.By("hitting the UDP service's LoadBalancer with same source port")
1113 stopCh := make(chan struct{})
1114 defer close(stopCh)
1115 var mu sync.Mutex
1116 hostnames := sets.NewString()
1117 go func() {
1118 defer ginkgo.GinkgoRecover()
1119 port := int(udpService.Spec.Ports[0].Port)
1120 laddr, err := net.ResolveUDPAddr("udp", ":54322")
1121 if err != nil {
1122 framework.Failf("Failed to resolve local address: %v", err)
1123 }
1124 raddr := net.UDPAddr{IP: netutils.ParseIPSloppy(udpIngressIP), Port: port}
1125
1126 for {
1127 select {
1128 case <-stopCh:
1129 if len(hostnames) != 2 {
1130 framework.Failf("Failed to hit the 2 UDP LoadBalancer backends successfully, got %v", hostnames.List())
1131 }
1132 return
1133 default:
1134 time.Sleep(1 * time.Second)
1135 }
1136
1137 conn, err := net.DialUDP("udp", laddr, &raddr)
1138 if err != nil {
1139 framework.Logf("Failed to connect to: %s %d", udpIngressIP, port)
1140 continue
1141 }
1142 conn.SetDeadline(time.Now().Add(3 * time.Second))
1143 framework.Logf("Connected successfully to: %s", raddr.String())
1144 conn.Write([]byte("hostname\n"))
1145 buff := make([]byte, 1024)
1146 n, _, err := conn.ReadFrom(buff)
1147 if err == nil {
1148 mu.Lock()
1149 hostnames.Insert(string(buff[:n]))
1150 mu.Unlock()
1151 framework.Logf("Connected successfully to hostname: %s", string(buff[:n]))
1152 }
1153 conn.Close()
1154 }
1155 }()
1156
1157
1158 ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
1159 serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
1160 serverPod1.Labels = udpJig.Labels
1161 serverPod1.Spec.Hostname = "hostname1"
1162 nodeSelection := e2epod.NodeSelection{Name: nodes.Items[0].Name}
1163 e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection)
1164 e2epod.NewPodClient(f).CreateSync(ctx, serverPod1)
1165
1166 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend1: {80}})
1167
1168
1169
1170
1171
1172
1173 ginkgo.By("checking client pod connected to the backend 1 on Node " + nodes.Items[0].Name)
1174 if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) {
1175 mu.Lock()
1176 defer mu.Unlock()
1177 return hostnames.Has(serverPod1.Spec.Hostname), nil
1178 }); err != nil {
1179 framework.Failf("Failed to connect to backend 1")
1180 }
1181
1182
1183 ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName)
1184 serverPod2 := e2epod.NewAgnhostPod(ns, podBackend2, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
1185 serverPod2.Labels = udpJig.Labels
1186 serverPod2.Spec.Hostname = "hostname2"
1187
1188 e2epod.SetNodeSelection(&serverPod2.Spec, nodeSelection)
1189 e2epod.NewPodClient(f).CreateSync(ctx, serverPod2)
1190
1191
1192 framework.Logf("Cleaning up %s pod", podBackend1)
1193 e2epod.NewPodClient(f).DeleteSync(ctx, podBackend1, metav1.DeleteOptions{}, e2epod.DefaultPodDeletionTimeout)
1194
1195 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend2: {80}})
1196
1197
1198
1199 ginkgo.By("checking client pod connected to the backend 2 on Node " + nodes.Items[0].Name)
1200 if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) {
1201 mu.Lock()
1202 defer mu.Unlock()
1203 return hostnames.Has(serverPod2.Spec.Hostname), nil
1204 }); err != nil {
1205 framework.Failf("Failed to connect to backend 2")
1206 }
1207 })
1208
1209 f.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Cluster", f.WithSlow(), func(ctx context.Context) {
1210
1211
1212
1213 minSuccessRate := 0.95
1214
1215 testRollingUpdateLBConnectivityDisruption(ctx, f, v1.ServiceExternalTrafficPolicyTypeCluster, minSuccessRate)
1216 })
1217
1218 f.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Local", f.WithSlow(), func(ctx context.Context) {
1219
1220
1221
1222 minSuccessRate := 0.95
1223
1224 testRollingUpdateLBConnectivityDisruption(ctx, f, v1.ServiceExternalTrafficPolicyTypeLocal, minSuccessRate)
1225 })
1226 })
1227
1228 var _ = common.SIGDescribe("LoadBalancers ESIPP", framework.WithSlow(), func() {
1229 f := framework.NewDefaultFramework("esipp")
1230 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
1231 var loadBalancerCreateTimeout time.Duration
1232
1233 var cs clientset.Interface
1234 var subnetPrefix *net.IPNet
1235 var err error
1236
1237 ginkgo.BeforeEach(func(ctx context.Context) {
1238
1239 e2eskipper.SkipUnlessProviderIs("gce", "gke")
1240
1241 cs = f.ClientSet
1242 loadBalancerCreateTimeout = e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
1243 subnetPrefix, err = getSubnetPrefix(ctx, cs)
1244 framework.ExpectNoError(err)
1245 })
1246
1247 ginkgo.AfterEach(func(ctx context.Context) {
1248 if ginkgo.CurrentSpecReport().Failed() {
1249 DescribeSvc(f.Namespace.Name)
1250 }
1251 })
1252
1253 ginkgo.It("should work for type=LoadBalancer", func(ctx context.Context) {
1254 namespace := f.Namespace.Name
1255 serviceName := "external-local-lb"
1256 jig := e2eservice.NewTestJig(cs, namespace, serviceName)
1257
1258 svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil)
1259 framework.ExpectNoError(err)
1260 healthCheckNodePort := int(svc.Spec.HealthCheckNodePort)
1261 if healthCheckNodePort == 0 {
1262 framework.Failf("Service HealthCheck NodePort was not allocated")
1263 }
1264 ginkgo.DeferCleanup(func(ctx context.Context) {
1265 err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
1266 framework.ExpectNoError(err)
1267
1268
1269 const threshold = 2
1270 nodes, err := getEndpointNodesWithInternalIP(ctx, jig)
1271 framework.ExpectNoError(err)
1272 config := e2enetwork.NewNetworkingTestConfig(ctx, f)
1273 for _, internalIP := range nodes {
1274 err := testHTTPHealthCheckNodePortFromTestContainer(ctx,
1275 config,
1276 internalIP,
1277 healthCheckNodePort,
1278 e2eservice.KubeProxyLagTimeout,
1279 false,
1280 threshold)
1281 framework.ExpectNoError(err)
1282 }
1283 err = cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
1284 framework.ExpectNoError(err)
1285 })
1286
1287 svcTCPPort := int(svc.Spec.Ports[0].Port)
1288 ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
1289
1290 ginkgo.By("reading clientIP using the TCP service's service port via its external VIP")
1291 clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, "/clientip")
1292 framework.ExpectNoError(err)
1293 framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIPPort)
1294
1295 ginkgo.By("checking if Source IP is preserved")
1296
1297 host, _, err := net.SplitHostPort(clientIPPort)
1298 if err != nil {
1299 framework.Failf("SplitHostPort returned unexpected error: %q", clientIPPort)
1300 }
1301 ip := netutils.ParseIPSloppy(host)
1302 if ip == nil {
1303 framework.Failf("Invalid client IP address format: %q", host)
1304 }
1305 if subnetPrefix.Contains(ip) {
1306 framework.Failf("Source IP was NOT preserved")
1307 }
1308 })
1309
1310 ginkgo.It("should work for type=NodePort", func(ctx context.Context) {
1311 namespace := f.Namespace.Name
1312 serviceName := "external-local-nodeport"
1313 jig := e2eservice.NewTestJig(cs, namespace, serviceName)
1314
1315 svc, err := jig.CreateOnlyLocalNodePortService(ctx, true)
1316 framework.ExpectNoError(err)
1317 ginkgo.DeferCleanup(func(ctx context.Context) {
1318 err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
1319 framework.ExpectNoError(err)
1320 })
1321
1322 tcpNodePort := int(svc.Spec.Ports[0].NodePort)
1323
1324 endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig)
1325 framework.ExpectNoError(err)
1326
1327 dialCmd := "clientip"
1328 config := e2enetwork.NewNetworkingTestConfig(ctx, f)
1329
1330 for nodeName, nodeIP := range endpointsNodeMap {
1331 ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v:%v/%v", nodeName, nodeIP, tcpNodePort, dialCmd))
1332 clientIP, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, dialCmd)
1333 framework.ExpectNoError(err)
1334 framework.Logf("ClientIP detected by target pod using NodePort is %s, the ip of test container is %s", clientIP, config.TestContainerPod.Status.PodIP)
1335
1336 if !strings.HasPrefix(clientIP, config.TestContainerPod.Status.PodIP) {
1337 framework.Failf("Source IP was NOT preserved")
1338 }
1339 }
1340 })
1341
1342 ginkgo.It("should only target nodes with endpoints", func(ctx context.Context) {
1343 namespace := f.Namespace.Name
1344 serviceName := "external-local-nodes"
1345 jig := e2eservice.NewTestJig(cs, namespace, serviceName)
1346 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests)
1347 framework.ExpectNoError(err)
1348
1349 svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, false,
1350 func(svc *v1.Service) {
1351
1352
1353 if len(svc.Spec.Ports) != 0 {
1354 svc.Spec.Ports[0].TargetPort = intstr.FromInt32(svc.Spec.Ports[0].Port)
1355 svc.Spec.Ports[0].Port = 8081
1356 }
1357
1358 })
1359 framework.ExpectNoError(err)
1360 ginkgo.DeferCleanup(func(ctx context.Context) {
1361 err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
1362 framework.ExpectNoError(err)
1363 err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
1364 framework.ExpectNoError(err)
1365 })
1366
1367 healthCheckNodePort := int(svc.Spec.HealthCheckNodePort)
1368 if healthCheckNodePort == 0 {
1369 framework.Failf("Service HealthCheck NodePort was not allocated")
1370 }
1371
1372 ips := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
1373
1374 ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
1375 svcTCPPort := int(svc.Spec.Ports[0].Port)
1376
1377 const threshold = 2
1378 config := e2enetwork.NewNetworkingTestConfig(ctx, f)
1379 for i := 0; i < len(nodes.Items); i++ {
1380 endpointNodeName := nodes.Items[i].Name
1381
1382 ginkgo.By("creating a pod to be part of the service " + serviceName + " on node " + endpointNodeName)
1383 _, err = jig.Run(ctx, func(rc *v1.ReplicationController) {
1384 rc.Name = serviceName
1385 if endpointNodeName != "" {
1386 rc.Spec.Template.Spec.NodeName = endpointNodeName
1387 }
1388 })
1389 framework.ExpectNoError(err)
1390
1391 ginkgo.By(fmt.Sprintf("waiting for service endpoint on node %v", endpointNodeName))
1392 err = jig.WaitForEndpointOnNode(ctx, endpointNodeName)
1393 framework.ExpectNoError(err)
1394
1395
1396
1397 for n, internalIP := range ips {
1398
1399
1400 e2eservice.TestReachableHTTP(ctx, ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout)
1401 expectedSuccess := nodes.Items[n].Name == endpointNodeName
1402 port := strconv.Itoa(healthCheckNodePort)
1403 ipPort := net.JoinHostPort(internalIP, port)
1404 framework.Logf("Health checking %s, http://%s/healthz, expectedSuccess %v", nodes.Items[n].Name, ipPort, expectedSuccess)
1405 err := testHTTPHealthCheckNodePortFromTestContainer(ctx,
1406 config,
1407 internalIP,
1408 healthCheckNodePort,
1409 e2eservice.KubeProxyEndpointLagTimeout,
1410 expectedSuccess,
1411 threshold)
1412 framework.ExpectNoError(err)
1413 }
1414 framework.ExpectNoError(e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, namespace, serviceName))
1415 }
1416 })
1417
1418 ginkgo.It("should work from pods", func(ctx context.Context) {
1419 var err error
1420 namespace := f.Namespace.Name
1421 serviceName := "external-local-pods"
1422 jig := e2eservice.NewTestJig(cs, namespace, serviceName)
1423
1424 svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil)
1425 framework.ExpectNoError(err)
1426 ginkgo.DeferCleanup(func(ctx context.Context) {
1427 err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
1428 framework.ExpectNoError(err)
1429 err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
1430 framework.ExpectNoError(err)
1431 })
1432
1433 ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
1434 port := strconv.Itoa(int(svc.Spec.Ports[0].Port))
1435 ipPort := net.JoinHostPort(ingressIP, port)
1436 path := fmt.Sprintf("%s/clientip", ipPort)
1437
1438 ginkgo.By("Creating pause pod deployment to make sure, pausePods are in desired state")
1439 deployment := createPausePodDeployment(ctx, cs, "pause-pod-deployment", namespace, 1)
1440 framework.ExpectNoError(e2edeployment.WaitForDeploymentComplete(cs, deployment), "Failed to complete pause pod deployment")
1441
1442 ginkgo.DeferCleanup(func(ctx context.Context) {
1443 framework.Logf("Deleting deployment")
1444 err = cs.AppsV1().Deployments(namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{})
1445 framework.ExpectNoError(err, "Failed to delete deployment %s", deployment.Name)
1446 })
1447
1448 deployment, err = cs.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{})
1449 framework.ExpectNoError(err, "Error in retrieving pause pod deployment")
1450 labelSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
1451 framework.ExpectNoError(err, "Error in setting LabelSelector as selector from deployment")
1452
1453 pausePods, err := cs.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector.String()})
1454 framework.ExpectNoError(err, "Error in listing pods associated with pause pod deployments")
1455
1456 pausePod := pausePods.Items[0]
1457 framework.Logf("Waiting up to %v curl %v", e2eservice.KubeProxyLagTimeout, path)
1458 cmd := fmt.Sprintf(`curl -q -s --connect-timeout 30 %v`, path)
1459
1460 var srcIP string
1461 loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs)
1462 ginkgo.By(fmt.Sprintf("Hitting external lb %v from pod %v on node %v", ingressIP, pausePod.Name, pausePod.Spec.NodeName))
1463 if pollErr := wait.PollImmediate(framework.Poll, loadBalancerPropagationTimeout, func() (bool, error) {
1464 stdout, err := e2eoutput.RunHostCmd(pausePod.Namespace, pausePod.Name, cmd)
1465 if err != nil {
1466 framework.Logf("got err: %v, retry until timeout", err)
1467 return false, nil
1468 }
1469 srcIP = strings.TrimSpace(strings.Split(stdout, ":")[0])
1470 return srcIP == pausePod.Status.PodIP, nil
1471 }); pollErr != nil {
1472 framework.Failf("Source IP not preserved from %v, expected '%v' got '%v'", pausePod.Name, pausePod.Status.PodIP, srcIP)
1473 }
1474 })
1475
1476 ginkgo.It("should handle updates to ExternalTrafficPolicy field", func(ctx context.Context) {
1477 namespace := f.Namespace.Name
1478 serviceName := "external-local-update"
1479 jig := e2eservice.NewTestJig(cs, namespace, serviceName)
1480
1481 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests)
1482 framework.ExpectNoError(err)
1483 if len(nodes.Items) < 2 {
1484 framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint")
1485 }
1486
1487 svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil)
1488 framework.ExpectNoError(err)
1489 ginkgo.DeferCleanup(func(ctx context.Context) {
1490 err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
1491 framework.ExpectNoError(err)
1492 err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
1493 framework.ExpectNoError(err)
1494 })
1495
1496
1497 healthCheckNodePort := int(svc.Spec.HealthCheckNodePort)
1498
1499 ginkgo.By("turning ESIPP off")
1500 svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
1501 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
1502 })
1503 framework.ExpectNoError(err)
1504 if svc.Spec.HealthCheckNodePort > 0 {
1505 framework.Failf("Service HealthCheck NodePort still present")
1506 }
1507
1508 epNodes, err := jig.ListNodesWithEndpoint(ctx)
1509 framework.ExpectNoError(err)
1510
1511
1512 endpointNodeMap := make(map[string]string)
1513
1514 noEndpointNodeMap := make(map[string]string)
1515 for _, node := range epNodes {
1516 ips := e2enode.GetAddresses(&node, v1.NodeInternalIP)
1517 if len(ips) < 1 {
1518 framework.Failf("No internal ip found for node %s", node.Name)
1519 }
1520 endpointNodeMap[node.Name] = ips[0]
1521 }
1522 for _, n := range nodes.Items {
1523 ips := e2enode.GetAddresses(&n, v1.NodeInternalIP)
1524 if len(ips) < 1 {
1525 framework.Failf("No internal ip found for node %s", n.Name)
1526 }
1527 if _, ok := endpointNodeMap[n.Name]; !ok {
1528 noEndpointNodeMap[n.Name] = ips[0]
1529 }
1530 }
1531 gomega.Expect(endpointNodeMap).ToNot(gomega.BeEmpty())
1532 gomega.Expect(noEndpointNodeMap).ToNot(gomega.BeEmpty())
1533
1534 svcTCPPort := int(svc.Spec.Ports[0].Port)
1535 svcNodePort := int(svc.Spec.Ports[0].NodePort)
1536 ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
1537 path := "/clientip"
1538 dialCmd := "clientip"
1539
1540 config := e2enetwork.NewNetworkingTestConfig(ctx, f)
1541
1542 ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap))
1543 for nodeName, nodeIP := range noEndpointNodeMap {
1544 ginkgo.By(fmt.Sprintf("Checking %v (%v:%v/%v) proxies to endpoints on another node", nodeName, nodeIP[0], svcNodePort, dialCmd))
1545 _, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, svcNodePort, e2eservice.KubeProxyLagTimeout, dialCmd)
1546 framework.ExpectNoError(err, "Could not reach HTTP service through %v:%v/%v after %v", nodeIP, svcNodePort, dialCmd, e2eservice.KubeProxyLagTimeout)
1547 }
1548
1549 for nodeName, nodeIP := range endpointNodeMap {
1550 ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIP))
1551 var body string
1552 pollFn := func() (bool, error) {
1553
1554 resp, err := config.GetResponseFromTestContainer(ctx,
1555 "http",
1556 "healthz",
1557 nodeIP,
1558 healthCheckNodePort)
1559 if err != nil {
1560 return false, nil
1561 }
1562 if len(resp.Errors) > 0 {
1563 return true, nil
1564 }
1565 if len(resp.Responses) > 0 {
1566 body = resp.Responses[0]
1567 }
1568 return false, nil
1569 }
1570 if pollErr := wait.PollImmediate(framework.Poll, e2eservice.TestTimeout, pollFn); pollErr != nil {
1571 framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. body %s",
1572 nodeName, healthCheckNodePort, body)
1573 }
1574 }
1575
1576
1577 ginkgo.By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP))
1578 var clientIP string
1579 pollErr := wait.PollImmediate(framework.Poll, 3*e2eservice.KubeProxyLagTimeout, func() (bool, error) {
1580 clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
1581 if err != nil {
1582 return false, nil
1583 }
1584
1585 host, _, err := net.SplitHostPort(clientIPPort)
1586 if err != nil {
1587 framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
1588 return false, nil
1589 }
1590 ip := netutils.ParseIPSloppy(host)
1591 if ip == nil {
1592 framework.Logf("Invalid client IP address format: %q", host)
1593 return false, nil
1594 }
1595 if subnetPrefix.Contains(ip) {
1596 return true, nil
1597 }
1598 return false, nil
1599 })
1600 if pollErr != nil {
1601 framework.Failf("Source IP WAS preserved even after ESIPP turned off. Got %v, expected a ten-dot cluster ip.", clientIP)
1602 }
1603
1604
1605
1606
1607
1608
1609
1610 ginkgo.By("setting ExternalTraffic field back to OnlyLocal")
1611 svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
1612 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
1613
1614 svc.Spec.HealthCheckNodePort = int32(healthCheckNodePort)
1615 })
1616 framework.ExpectNoError(err)
1617 loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs)
1618 pollErr = wait.PollImmediate(framework.PollShortTimeout, loadBalancerPropagationTimeout, func() (bool, error) {
1619 clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
1620 if err != nil {
1621 return false, nil
1622 }
1623 ginkgo.By(fmt.Sprintf("Endpoint %v:%v%v returned client ip %v", ingressIP, svcTCPPort, path, clientIPPort))
1624
1625 host, _, err := net.SplitHostPort(clientIPPort)
1626 if err != nil {
1627 framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
1628 return false, nil
1629 }
1630 ip := netutils.ParseIPSloppy(host)
1631 if ip == nil {
1632 framework.Logf("Invalid client IP address format: %q", host)
1633 return false, nil
1634 }
1635 if !subnetPrefix.Contains(ip) {
1636 return true, nil
1637 }
1638 return false, nil
1639 })
1640 if pollErr != nil {
1641 framework.Failf("Source IP (%v) is not the client IP even after ESIPP turned on, expected a public IP.", clientIP)
1642 }
1643 })
1644 })
1645
1646 func testRollingUpdateLBConnectivityDisruption(ctx context.Context, f *framework.Framework, externalTrafficPolicy v1.ServiceExternalTrafficPolicyType, minSuccessRate float64) {
1647 cs := f.ClientSet
1648 ns := f.Namespace.Name
1649 name := "test-lb-rolling-update"
1650 labels := map[string]string{"name": name}
1651 gracePeriod := int64(60)
1652 maxUnavailable := intstr.FromString("10%")
1653 ds := e2edaemonset.NewDaemonSet(name, e2eapps.AgnhostImage, labels, nil, nil,
1654 []v1.ContainerPort{
1655 {ContainerPort: 80},
1656 },
1657 "netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod),
1658 )
1659 ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{
1660 Type: appsv1.RollingUpdateDaemonSetStrategyType,
1661 RollingUpdate: &appsv1.RollingUpdateDaemonSet{
1662 MaxUnavailable: &maxUnavailable,
1663 },
1664 }
1665 ds.Spec.Template.Labels = labels
1666 ds.Spec.Template.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod)
1667
1668 nodeNames := e2edaemonset.SchedulableNodes(ctx, cs, ds)
1669 e2eskipper.SkipUnlessAtLeast(len(nodeNames), 2, "load-balancer rolling update test requires at least 2 schedulable nodes for the DaemonSet")
1670 if len(nodeNames) > 25 {
1671 e2eskipper.Skipf("load-balancer rolling update test skipped for large environments with more than 25 nodes")
1672 }
1673
1674 ginkgo.By(fmt.Sprintf("Creating DaemonSet %q", name))
1675 ds, err := cs.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
1676 framework.ExpectNoError(err)
1677
1678 ginkgo.By("Checking that daemon pods launch on every schedulable node of the cluster")
1679 creationTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
1680 err = wait.PollUntilContextTimeout(ctx, framework.Poll, creationTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, nodeNames))
1681 framework.ExpectNoError(err, "error waiting for daemon pods to start")
1682 err = e2edaemonset.CheckDaemonStatus(ctx, f, name)
1683 framework.ExpectNoError(err)
1684
1685 ginkgo.By(fmt.Sprintf("Creating a service %s with type=LoadBalancer externalTrafficPolicy=%s in namespace %s", name, externalTrafficPolicy, ns))
1686 jig := e2eservice.NewTestJig(cs, ns, name)
1687 jig.Labels = labels
1688 service, err := jig.CreateLoadBalancerService(ctx, creationTimeout, func(svc *v1.Service) {
1689 svc.Spec.ExternalTrafficPolicy = externalTrafficPolicy
1690 })
1691 framework.ExpectNoError(err)
1692
1693 lbNameOrAddress := e2eservice.GetIngressPoint(&service.Status.LoadBalancer.Ingress[0])
1694 svcPort := int(service.Spec.Ports[0].Port)
1695
1696 ginkgo.By("Hitting the DaemonSet's pods through the service's load balancer")
1697 timeout := e2eservice.LoadBalancerLagTimeoutDefault
1698 if framework.ProviderIs("aws") {
1699 timeout = e2eservice.LoadBalancerLagTimeoutAWS
1700 }
1701 e2eservice.TestReachableHTTP(ctx, lbNameOrAddress, svcPort, timeout)
1702
1703 ginkgo.By("Starting a goroutine to continuously hit the DaemonSet's pods through the service's load balancer")
1704 var totalRequests uint64 = 0
1705 var networkErrors uint64 = 0
1706 var httpErrors uint64 = 0
1707 done := make(chan struct{})
1708 defer close(done)
1709 go func() {
1710 defer ginkgo.GinkgoRecover()
1711
1712 wait.Until(func() {
1713 atomic.AddUint64(&totalRequests, 1)
1714 client := &http.Client{
1715 Transport: utilnet.SetTransportDefaults(&http.Transport{
1716 DisableKeepAlives: true,
1717 }),
1718 Timeout: 5 * time.Second,
1719 }
1720 ipPort := net.JoinHostPort(lbNameOrAddress, strconv.Itoa(svcPort))
1721 msg := "hello"
1722 url := fmt.Sprintf("http://%s/echo?msg=%s", ipPort, msg)
1723 resp, err := client.Get(url)
1724 if err != nil {
1725 framework.Logf("Got error testing for reachability of %s: %v", url, err)
1726 atomic.AddUint64(&networkErrors, 1)
1727 return
1728 }
1729 defer resp.Body.Close()
1730 if resp.StatusCode != http.StatusOK {
1731 framework.Logf("Got bad status code: %d", resp.StatusCode)
1732 atomic.AddUint64(&httpErrors, 1)
1733 return
1734 }
1735 body, err := io.ReadAll(resp.Body)
1736 if err != nil {
1737 framework.Logf("Got error reading HTTP body: %v", err)
1738 atomic.AddUint64(&httpErrors, 1)
1739 return
1740 }
1741 if string(body) != msg {
1742 framework.Logf("The response body does not contain expected string %s", string(body))
1743 atomic.AddUint64(&httpErrors, 1)
1744 return
1745 }
1746 }, time.Duration(0), done)
1747 }()
1748
1749 ginkgo.By("Triggering DaemonSet rolling update several times")
1750 var previousTotalRequests uint64 = 0
1751 var previousNetworkErrors uint64 = 0
1752 var previousHttpErrors uint64 = 0
1753 for i := 1; i <= 5; i++ {
1754 framework.Logf("Update daemon pods environment: [{\"name\":\"VERSION\",\"value\":\"%d\"}]", i)
1755 patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%d"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, i)
1756 ds, err = cs.AppsV1().DaemonSets(ns).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
1757 framework.ExpectNoError(err)
1758
1759 framework.Logf("Check that daemon pods are available on every node of the cluster with the updated environment.")
1760 err = wait.PollImmediate(framework.Poll, creationTimeout, func() (bool, error) {
1761 podList, err := cs.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{})
1762 if err != nil {
1763 return false, err
1764 }
1765 pods := podList.Items
1766
1767 readyPods := 0
1768 for _, pod := range pods {
1769 if !metav1.IsControlledBy(&pod, ds) {
1770 continue
1771 }
1772 if pod.DeletionTimestamp != nil {
1773 continue
1774 }
1775 podVersion := ""
1776 for _, env := range pod.Spec.Containers[0].Env {
1777 if env.Name == "VERSION" {
1778 podVersion = env.Value
1779 break
1780 }
1781 }
1782 if podVersion != fmt.Sprintf("%d", i) {
1783 continue
1784 }
1785 podReady := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now())
1786 if !podReady {
1787 continue
1788 }
1789 readyPods += 1
1790 }
1791 framework.Logf("Number of running nodes: %d, number of updated ready pods: %d in daemonset %s", len(nodeNames), readyPods, ds.Name)
1792 return readyPods == len(nodeNames), nil
1793 })
1794 framework.ExpectNoError(err, "error waiting for daemon pods to be ready")
1795
1796
1797 currentTotalRequests := atomic.LoadUint64(&totalRequests)
1798 currentNetworkErrors := atomic.LoadUint64(&networkErrors)
1799 currentHttpErrors := atomic.LoadUint64(&httpErrors)
1800
1801 partialTotalRequests := currentTotalRequests - previousTotalRequests
1802 partialNetworkErrors := currentNetworkErrors - previousNetworkErrors
1803 partialHttpErrors := currentHttpErrors - previousHttpErrors
1804 partialSuccessRate := (float64(partialTotalRequests) - float64(partialNetworkErrors+partialHttpErrors)) / float64(partialTotalRequests)
1805
1806 framework.Logf("Load Balancer total HTTP requests: %d", partialTotalRequests)
1807 framework.Logf("Network errors: %d", partialNetworkErrors)
1808 framework.Logf("HTTP errors: %d", partialHttpErrors)
1809 framework.Logf("Success rate: %.2f%%", partialSuccessRate*100)
1810 if partialSuccessRate < minSuccessRate {
1811 framework.Failf("Encountered too many errors when doing HTTP requests to the load balancer address. Success rate is %.2f%%, and the minimum allowed threshold is %.2f%%.", partialSuccessRate*100, minSuccessRate*100)
1812 }
1813
1814 previousTotalRequests = currentTotalRequests
1815 previousNetworkErrors = currentNetworkErrors
1816 previousHttpErrors = currentHttpErrors
1817 }
1818
1819
1820 e2eservice.TestReachableHTTP(ctx, lbNameOrAddress, svcPort, timeout)
1821 }
1822
View as plain text