//go:build !providerless // +build !providerless /* Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package network import ( "context" "fmt" "io" "math/big" "net" "net/http" "strconv" "strings" "sync" "sync/atomic" "time" compute "google.golang.org/api/compute/v1" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" podutil "k8s.io/kubernetes/pkg/api/v1/pod" e2eapps "k8s.io/kubernetes/test/e2e/apps" "k8s.io/kubernetes/test/e2e/framework" e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" "k8s.io/kubernetes/test/e2e/framework/providers/gce" e2erc "k8s.io/kubernetes/test/e2e/framework/rc" e2eservice "k8s.io/kubernetes/test/e2e/framework/service" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "k8s.io/kubernetes/test/e2e/network/common" admissionapi "k8s.io/pod-security-admission/api" netutils "k8s.io/utils/net" utilpointer "k8s.io/utils/pointer" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" ) // getInternalIP returns node internal IP func getInternalIP(node *v1.Node) (string, error) { for _, address := range node.Status.Addresses { if address.Type == v1.NodeInternalIP && address.Address != "" { return address.Address, nil } } return "", fmt.Errorf("couldn't get the internal IP of host %s with addresses %v", node.Name, node.Status.Addresses) } // getSubnetPrefix returns a network prefix based on one of the workers // InternalIP adding a /16 or /64 mask depending on the IP family of the node. // IMPORTANT: These assumes a flat network assigned to the nodes, that is common // on cloud providers. func getSubnetPrefix(ctx context.Context, c clientset.Interface) (*net.IPNet, error) { node, err := getReadySchedulableWorkerNode(ctx, c) if err != nil { return nil, fmt.Errorf("error getting a ready schedulable worker Node, err: %w", err) } internalIP, err := getInternalIP(node) if err != nil { return nil, fmt.Errorf("error getting Node internal IP, err: %w", err) } ip := netutils.ParseIPSloppy(internalIP) if ip == nil { return nil, fmt.Errorf("invalid IP address format: %s", internalIP) } // if IPv6 return a net.IPNet with IP = ip and mask /64 ciderMask := net.CIDRMask(64, 128) // if IPv4 return a net.IPNet with IP = ip and mask /16 if netutils.IsIPv4(ip) { ciderMask = net.CIDRMask(16, 32) } return &net.IPNet{IP: ip.Mask(ciderMask), Mask: ciderMask}, nil } // getReadySchedulableWorkerNode gets a single worker node which is available for // running pods on. If there are no such available nodes it will return an error. func getReadySchedulableWorkerNode(ctx context.Context, c clientset.Interface) (*v1.Node, error) { nodes, err := e2enode.GetReadySchedulableNodes(ctx, c) if err != nil { return nil, err } for i := range nodes.Items { node := nodes.Items[i] _, isMaster := node.Labels["node-role.kubernetes.io/master"] _, isControlPlane := node.Labels["node-role.kubernetes.io/control-plane"] if !isMaster && !isControlPlane { return &node, nil } } return nil, fmt.Errorf("there are currently no ready, schedulable worker nodes in the cluster") } var _ = common.SIGDescribe("LoadBalancers", func() { f := framework.NewDefaultFramework("loadbalancers") f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged var cs clientset.Interface var subnetPrefix *net.IPNet var err error ginkgo.BeforeEach(func(ctx context.Context) { cs = f.ClientSet subnetPrefix, err = getSubnetPrefix(ctx, cs) framework.ExpectNoError(err) }) ginkgo.AfterEach(func(ctx context.Context) { if ginkgo.CurrentSpecReport().Failed() { DescribeSvc(f.Namespace.Name) } }) f.It("should be able to change the type and ports of a TCP service", f.WithSlow(), func(ctx context.Context) { // requires cloud load-balancer support e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws") loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault if framework.ProviderIs("aws") { loadBalancerLagTimeout = e2eservice.LoadBalancerLagTimeoutAWS } loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs) // This test is more monolithic than we'd like because LB turnup can be // very slow, so we lumped all the tests into one LB lifecycle. serviceName := "mutability-test" ns1 := f.Namespace.Name // LB1 in ns1 on TCP framework.Logf("namespace for TCP test: %s", ns1) ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns1) tcpJig := e2eservice.NewTestJig(cs, ns1, serviceName) tcpService, err := tcpJig.CreateTCPService(ctx, nil) framework.ExpectNoError(err) svcPort := int(tcpService.Spec.Ports[0].Port) framework.Logf("service port TCP: %d", svcPort) ginkgo.By("creating a pod to be part of the TCP service " + serviceName) _, err = tcpJig.Run(ctx, nil) framework.ExpectNoError(err) execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns1, "execpod", nil) err = tcpJig.CheckServiceReachability(ctx, tcpService, execPod) framework.ExpectNoError(err) // Change the services to NodePort. ginkgo.By("changing the TCP service to type=NodePort") tcpService, err = tcpJig.UpdateService(ctx, func(s *v1.Service) { s.Spec.Type = v1.ServiceTypeNodePort }) framework.ExpectNoError(err) tcpNodePort := int(tcpService.Spec.Ports[0].NodePort) framework.Logf("TCP node port: %d", tcpNodePort) err = tcpJig.CheckServiceReachability(ctx, tcpService, execPod) framework.ExpectNoError(err) // Change the services to LoadBalancer. // Here we test that LoadBalancers can receive static IP addresses. This isn't // necessary, but is an additional feature this monolithic test checks. requestedIP := "" staticIPName := "" if framework.ProviderIs("gce", "gke") { ginkgo.By("creating a static load balancer IP") staticIPName = fmt.Sprintf("e2e-external-lb-test-%s", framework.RunID) gceCloud, err := gce.GetGCECloud() framework.ExpectNoError(err, "failed to get GCE cloud provider") err = gceCloud.ReserveRegionAddress(&compute.Address{Name: staticIPName}, gceCloud.Region()) defer func() { if staticIPName != "" { // Release GCE static IP - this is not kube-managed and will not be automatically released. if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil { framework.Logf("failed to release static IP %s: %v", staticIPName, err) } } }() framework.ExpectNoError(err, "failed to create region address: %s", staticIPName) reservedAddr, err := gceCloud.GetRegionAddress(staticIPName, gceCloud.Region()) framework.ExpectNoError(err, "failed to get region address: %s", staticIPName) requestedIP = reservedAddr.Address framework.Logf("Allocated static load balancer IP: %s", requestedIP) } ginkgo.By("changing the TCP service to type=LoadBalancer") _, err = tcpJig.UpdateService(ctx, func(s *v1.Service) { s.Spec.LoadBalancerIP = requestedIP // will be "" if not applicable s.Spec.Type = v1.ServiceTypeLoadBalancer }) framework.ExpectNoError(err) ginkgo.By("waiting for the TCP service to have a load balancer") // Wait for the load balancer to be created asynchronously tcpService, err = tcpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout) framework.ExpectNoError(err) if int(tcpService.Spec.Ports[0].NodePort) != tcpNodePort { framework.Failf("TCP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", tcpNodePort, tcpService.Spec.Ports[0].NodePort) } if requestedIP != "" && e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != requestedIP { framework.Failf("unexpected TCP Status.LoadBalancer.Ingress (expected %s, got %s)", requestedIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])) } tcpIngressIP := e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) framework.Logf("TCP load balancer: %s", tcpIngressIP) if framework.ProviderIs("gce", "gke") { // Do this as early as possible, which overrides the `defer` above. // This is mostly out of fear of leaking the IP in a timeout case // (as of this writing we're not 100% sure where the leaks are // coming from, so this is first-aid rather than surgery). ginkgo.By("demoting the static IP to ephemeral") if staticIPName != "" { gceCloud, err := gce.GetGCECloud() framework.ExpectNoError(err, "failed to get GCE cloud provider") // Deleting it after it is attached "demotes" it to an // ephemeral IP, which can be auto-released. if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil { framework.Failf("failed to release static IP %s: %v", staticIPName, err) } staticIPName = "" } } err = tcpJig.CheckServiceReachability(ctx, tcpService, execPod) framework.ExpectNoError(err) ginkgo.By("hitting the TCP service's LoadBalancer") e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout) // Change the services' node ports. ginkgo.By("changing the TCP service's NodePort") tcpService, err = tcpJig.ChangeServiceNodePort(ctx, tcpNodePort) framework.ExpectNoError(err) tcpNodePortOld := tcpNodePort tcpNodePort = int(tcpService.Spec.Ports[0].NodePort) if tcpNodePort == tcpNodePortOld { framework.Failf("TCP Spec.Ports[0].NodePort (%d) did not change", tcpNodePort) } if e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP { framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])) } framework.Logf("TCP node port: %d", tcpNodePort) ginkgo.By("hitting the TCP service's LoadBalancer") e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout) // Change the services' main ports. ginkgo.By("changing the TCP service's port") tcpService, err = tcpJig.UpdateService(ctx, func(s *v1.Service) { s.Spec.Ports[0].Port++ }) framework.ExpectNoError(err) svcPortOld := svcPort svcPort = int(tcpService.Spec.Ports[0].Port) if svcPort == svcPortOld { framework.Failf("TCP Spec.Ports[0].Port (%d) did not change", svcPort) } if int(tcpService.Spec.Ports[0].NodePort) != tcpNodePort { framework.Failf("TCP Spec.Ports[0].NodePort (%d) changed", tcpService.Spec.Ports[0].NodePort) } if e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP { framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])) } framework.Logf("service port TCP: %d", svcPort) ginkgo.By("hitting the TCP service's LoadBalancer") e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerCreateTimeout) ginkgo.By("Scaling the pods to 0") err = tcpJig.Scale(ctx, 0) framework.ExpectNoError(err) ginkgo.By("looking for ICMP REJECT on the TCP service's LoadBalancer") testRejectedHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) ginkgo.By("Scaling the pods to 1") err = tcpJig.Scale(ctx, 1) framework.ExpectNoError(err) ginkgo.By("hitting the TCP service's LoadBalancer") e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerCreateTimeout) // Change the services back to ClusterIP. ginkgo.By("changing TCP service back to type=ClusterIP") tcpReadback, err := tcpJig.UpdateService(ctx, func(s *v1.Service) { s.Spec.Type = v1.ServiceTypeClusterIP }) framework.ExpectNoError(err) if tcpReadback.Spec.Ports[0].NodePort != 0 { framework.Fail("TCP Spec.Ports[0].NodePort was not cleared") } // Wait for the load balancer to be destroyed asynchronously _, err = tcpJig.WaitForLoadBalancerDestroy(ctx, tcpIngressIP, svcPort, loadBalancerCreateTimeout) framework.ExpectNoError(err) ginkgo.By("checking the TCP LoadBalancer is closed") testNotReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout) }) f.It("should be able to change the type and ports of a UDP service", f.WithSlow(), func(ctx context.Context) { // requires cloud load-balancer support e2eskipper.SkipUnlessProviderIs("gce", "gke") loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs) // This test is more monolithic than we'd like because LB turnup can be // very slow, so we lumped all the tests into one LB lifecycle. serviceName := "mutability-test" ns2 := f.Namespace.Name // LB1 in ns2 on TCP framework.Logf("namespace for TCP test: %s", ns2) ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in namespace " + ns2) udpJig := e2eservice.NewTestJig(cs, ns2, serviceName) udpService, err := udpJig.CreateUDPService(ctx, nil) framework.ExpectNoError(err) svcPort := int(udpService.Spec.Ports[0].Port) framework.Logf("service port UDP: %d", svcPort) ginkgo.By("creating a pod to be part of the UDP service " + serviceName) _, err = udpJig.Run(ctx, nil) framework.ExpectNoError(err) execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns2, "execpod", nil) err = udpJig.CheckServiceReachability(ctx, udpService, execPod) framework.ExpectNoError(err) // Change the services to NodePort. ginkgo.By("changing the UDP service to type=NodePort") udpService, err = udpJig.UpdateService(ctx, func(s *v1.Service) { s.Spec.Type = v1.ServiceTypeNodePort }) framework.ExpectNoError(err) udpNodePort := int(udpService.Spec.Ports[0].NodePort) framework.Logf("UDP node port: %d", udpNodePort) err = udpJig.CheckServiceReachability(ctx, udpService, execPod) framework.ExpectNoError(err) // Change the services to LoadBalancer. // Here we test that LoadBalancers can receive static IP addresses. This isn't // necessary, but is an additional feature this monolithic test checks. requestedIP := "" staticIPName := "" ginkgo.By("creating a static load balancer IP") staticIPName = fmt.Sprintf("e2e-external-lb-test-%s", framework.RunID) gceCloud, err := gce.GetGCECloud() framework.ExpectNoError(err, "failed to get GCE cloud provider") err = gceCloud.ReserveRegionAddress(&compute.Address{Name: staticIPName}, gceCloud.Region()) defer func() { if staticIPName != "" { // Release GCE static IP - this is not kube-managed and will not be automatically released. if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil { framework.Logf("failed to release static IP %s: %v", staticIPName, err) } } }() framework.ExpectNoError(err, "failed to create region address: %s", staticIPName) reservedAddr, err := gceCloud.GetRegionAddress(staticIPName, gceCloud.Region()) framework.ExpectNoError(err, "failed to get region address: %s", staticIPName) requestedIP = reservedAddr.Address framework.Logf("Allocated static load balancer IP: %s", requestedIP) ginkgo.By("changing the UDP service to type=LoadBalancer") _, err = udpJig.UpdateService(ctx, func(s *v1.Service) { s.Spec.Type = v1.ServiceTypeLoadBalancer }) framework.ExpectNoError(err) // Do this as early as possible, which overrides the `defer` above. // This is mostly out of fear of leaking the IP in a timeout case // (as of this writing we're not 100% sure where the leaks are // coming from, so this is first-aid rather than surgery). ginkgo.By("demoting the static IP to ephemeral") if staticIPName != "" { gceCloud, err := gce.GetGCECloud() framework.ExpectNoError(err, "failed to get GCE cloud provider") // Deleting it after it is attached "demotes" it to an // ephemeral IP, which can be auto-released. if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil { framework.Failf("failed to release static IP %s: %v", staticIPName, err) } staticIPName = "" } var udpIngressIP string ginkgo.By("waiting for the UDP service to have a load balancer") // 2nd one should be faster since they ran in parallel. udpService, err = udpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout) framework.ExpectNoError(err) if int(udpService.Spec.Ports[0].NodePort) != udpNodePort { framework.Failf("UDP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", udpNodePort, udpService.Spec.Ports[0].NodePort) } udpIngressIP = e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) framework.Logf("UDP load balancer: %s", udpIngressIP) err = udpJig.CheckServiceReachability(ctx, udpService, execPod) framework.ExpectNoError(err) ginkgo.By("hitting the UDP service's LoadBalancer") testReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) // Change the services' node ports. ginkgo.By("changing the UDP service's NodePort") udpService, err = udpJig.ChangeServiceNodePort(ctx, udpNodePort) framework.ExpectNoError(err) udpNodePortOld := udpNodePort udpNodePort = int(udpService.Spec.Ports[0].NodePort) if udpNodePort == udpNodePortOld { framework.Failf("UDP Spec.Ports[0].NodePort (%d) did not change", udpNodePort) } if e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP { framework.Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])) } framework.Logf("UDP node port: %d", udpNodePort) err = udpJig.CheckServiceReachability(ctx, udpService, execPod) framework.ExpectNoError(err) ginkgo.By("hitting the UDP service's LoadBalancer") testReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) // Change the services' main ports. ginkgo.By("changing the UDP service's port") udpService, err = udpJig.UpdateService(ctx, func(s *v1.Service) { s.Spec.Ports[0].Port++ }) framework.ExpectNoError(err) svcPortOld := svcPort svcPort = int(udpService.Spec.Ports[0].Port) if svcPort == svcPortOld { framework.Failf("UDP Spec.Ports[0].Port (%d) did not change", svcPort) } if int(udpService.Spec.Ports[0].NodePort) != udpNodePort { framework.Failf("UDP Spec.Ports[0].NodePort (%d) changed", udpService.Spec.Ports[0].NodePort) } if e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP { framework.Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])) } framework.Logf("service port UDP: %d", svcPort) ginkgo.By("hitting the UDP service's NodePort") err = udpJig.CheckServiceReachability(ctx, udpService, execPod) framework.ExpectNoError(err) ginkgo.By("hitting the UDP service's LoadBalancer") testReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) ginkgo.By("Scaling the pods to 0") err = udpJig.Scale(ctx, 0) framework.ExpectNoError(err) ginkgo.By("looking for ICMP REJECT on the UDP service's LoadBalancer") testRejectedUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) ginkgo.By("Scaling the pods to 1") err = udpJig.Scale(ctx, 1) framework.ExpectNoError(err) ginkgo.By("hitting the UDP service's NodePort") err = udpJig.CheckServiceReachability(ctx, udpService, execPod) framework.ExpectNoError(err) ginkgo.By("hitting the UDP service's LoadBalancer") testReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) // Change the services back to ClusterIP. ginkgo.By("changing UDP service back to type=ClusterIP") udpReadback, err := udpJig.UpdateService(ctx, func(s *v1.Service) { s.Spec.Type = v1.ServiceTypeClusterIP }) framework.ExpectNoError(err) if udpReadback.Spec.Ports[0].NodePort != 0 { framework.Fail("UDP Spec.Ports[0].NodePort was not cleared") } // Wait for the load balancer to be destroyed asynchronously _, err = udpJig.WaitForLoadBalancerDestroy(ctx, udpIngressIP, svcPort, loadBalancerCreateTimeout) framework.ExpectNoError(err) ginkgo.By("checking the UDP LoadBalancer is closed") testNotReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) }) f.It("should only allow access from service loadbalancer source ranges", f.WithSlow(), func(ctx context.Context) { // this feature currently supported only on GCE/GKE/AWS/AZURE e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws", "azure") loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs) namespace := f.Namespace.Name serviceName := "lb-sourcerange" jig := e2eservice.NewTestJig(cs, namespace, serviceName) ginkgo.By("Prepare allow source ips") // prepare the exec pods // acceptPod are allowed to access the loadbalancer acceptPod := e2epod.CreateExecPodOrFail(ctx, cs, namespace, "execpod-accept", nil) dropPod := e2epod.CreateExecPodOrFail(ctx, cs, namespace, "execpod-drop", nil) ginkgo.By("creating a pod to be part of the service " + serviceName) // This container is an nginx container listening on port 80 // See kubernetes/contrib/ingress/echoheaders/nginx.conf for content of response _, err := jig.Run(ctx, nil) framework.ExpectNoError(err) // Make sure acceptPod is running. There are certain chances that pod might be terminated due to unexpected reasons. acceptPod, err = cs.CoreV1().Pods(namespace).Get(ctx, acceptPod.Name, metav1.GetOptions{}) framework.ExpectNoError(err, "Unable to get pod %s", acceptPod.Name) gomega.Expect(acceptPod.Status.Phase).To(gomega.Equal(v1.PodRunning)) gomega.Expect(acceptPod.Status.PodIP).ToNot(gomega.BeEmpty()) // Create loadbalancer service with source range from node[0] and podAccept svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.LoadBalancerSourceRanges = []string{acceptPod.Status.PodIP + "/32"} }) framework.ExpectNoError(err) ginkgo.DeferCleanup(func(ctx context.Context) { ginkgo.By("Clean up loadbalancer service") e2eservice.WaitForServiceDeletedWithFinalizer(ctx, cs, svc.Namespace, svc.Name) }) svc, err = jig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout) framework.ExpectNoError(err) ginkgo.By("check reachability from different sources") svcIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) // We should wait until service changes are actually propagated in the cloud-provider, // as this may take significant amount of time, especially in large clusters. // However, the information whether it was already programmed isn't achievable. // So we're resolving it by using loadBalancerCreateTimeout that takes cluster size into account. checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, acceptPod.Name, svcIP) checkReachabilityFromPod(false, loadBalancerCreateTimeout, namespace, dropPod.Name, svcIP) // Make sure dropPod is running. There are certain chances that the pod might be terminated due to unexpected reasons. dropPod, err = cs.CoreV1().Pods(namespace).Get(ctx, dropPod.Name, metav1.GetOptions{}) framework.ExpectNoError(err, "Unable to get pod %s", dropPod.Name) gomega.Expect(acceptPod.Status.Phase).To(gomega.Equal(v1.PodRunning)) gomega.Expect(acceptPod.Status.PodIP).ToNot(gomega.BeEmpty()) ginkgo.By("Update service LoadBalancerSourceRange and check reachability") _, err = jig.UpdateService(ctx, func(svc *v1.Service) { // only allow access from dropPod svc.Spec.LoadBalancerSourceRanges = []string{dropPod.Status.PodIP + "/32"} }) framework.ExpectNoError(err) // We should wait until service changes are actually propagates, as this may take // significant amount of time, especially in large clusters. // However, the information whether it was already programmed isn't achievable. // So we're resolving it by using loadBalancerCreateTimeout that takes cluster size into account. checkReachabilityFromPod(false, loadBalancerCreateTimeout, namespace, acceptPod.Name, svcIP) checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, dropPod.Name, svcIP) ginkgo.By("Delete LoadBalancerSourceRange field and check reachability") _, err = jig.UpdateService(ctx, func(svc *v1.Service) { svc.Spec.LoadBalancerSourceRanges = nil }) framework.ExpectNoError(err) // We should wait until service changes are actually propagates, as this may take // significant amount of time, especially in large clusters. // However, the information whether it was already programmed isn't achievable. // So we're resolving it by using loadBalancerCreateTimeout that takes cluster size into account. checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, acceptPod.Name, svcIP) checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, dropPod.Name, svcIP) }) f.It("should be able to create an internal type load balancer", f.WithSlow(), func(ctx context.Context) { e2eskipper.SkipUnlessProviderIs("gke", "gce") createTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs) pollInterval := framework.Poll * 10 namespace := f.Namespace.Name serviceName := "lb-internal" jig := e2eservice.NewTestJig(cs, namespace, serviceName) ginkgo.By("creating pod to be part of service " + serviceName) _, err := jig.Run(ctx, nil) framework.ExpectNoError(err) enableILB, disableILB := enableAndDisableInternalLB() isInternalEndpoint := func(lbIngress *v1.LoadBalancerIngress) bool { ingressEndpoint := e2eservice.GetIngressPoint(lbIngress) ingressIP := netutils.ParseIPSloppy(ingressEndpoint) if ingressIP == nil { framework.Failf("invalid ingressEndpoint IP address format: %s", ingressEndpoint) } // Needs update for providers using hostname as endpoint. return subnetPrefix.Contains(ingressIP) } ginkgo.By("creating a service with type LoadBalancer and cloud specific Internal-LB annotation enabled") svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer enableILB(svc) }) framework.ExpectNoError(err) ginkgo.DeferCleanup(func(ctx context.Context) { ginkgo.By("Clean up loadbalancer service") e2eservice.WaitForServiceDeletedWithFinalizer(ctx, cs, svc.Namespace, svc.Name) }) svc, err = jig.WaitForLoadBalancer(ctx, createTimeout) framework.ExpectNoError(err) lbIngress := &svc.Status.LoadBalancer.Ingress[0] svcPort := int(svc.Spec.Ports[0].Port) // should have an internal IP. if !isInternalEndpoint(lbIngress) { framework.Failf("lbIngress %v doesn't have an internal IP", lbIngress) } // ILBs are not accessible from the test orchestrator, so it's necessary to use // a pod to test the service. ginkgo.By("hitting the internal load balancer from pod") framework.Logf("creating pod with host network") hostExec := launchHostExecPod(ctx, f.ClientSet, f.Namespace.Name, "ilb-host-exec") framework.Logf("Waiting up to %v for service %q's internal LB to respond to requests", createTimeout, serviceName) tcpIngressIP := e2eservice.GetIngressPoint(lbIngress) if pollErr := wait.PollImmediate(pollInterval, createTimeout, func() (bool, error) { cmd := fmt.Sprintf(`curl -m 5 'http://%v:%v/echo?msg=hello'`, tcpIngressIP, svcPort) stdout, err := e2eoutput.RunHostCmd(hostExec.Namespace, hostExec.Name, cmd) if err != nil { framework.Logf("error curling; stdout: %v. err: %v", stdout, err) return false, nil } if !strings.Contains(stdout, "hello") { framework.Logf("Expected output to contain 'hello', got %q; retrying...", stdout) return false, nil } framework.Logf("Successful curl; stdout: %v", stdout) return true, nil }); pollErr != nil { framework.Failf("ginkgo.Failed to hit ILB IP, err: %v", pollErr) } ginkgo.By("switching to external type LoadBalancer") svc, err = jig.UpdateService(ctx, func(svc *v1.Service) { disableILB(svc) }) framework.ExpectNoError(err) framework.Logf("Waiting up to %v for service %q to have an external LoadBalancer", createTimeout, serviceName) if pollErr := wait.PollImmediate(pollInterval, createTimeout, func() (bool, error) { svc, err := cs.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{}) if err != nil { return false, err } lbIngress = &svc.Status.LoadBalancer.Ingress[0] return !isInternalEndpoint(lbIngress), nil }); pollErr != nil { framework.Failf("Loadbalancer IP not changed to external.") } // should have an external IP. gomega.Expect(isInternalEndpoint(lbIngress)).To(gomega.BeFalse()) ginkgo.By("hitting the external load balancer") framework.Logf("Waiting up to %v for service %q's external LB to respond to requests", createTimeout, serviceName) tcpIngressIP = e2eservice.GetIngressPoint(lbIngress) e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, e2eservice.LoadBalancerLagTimeoutDefault) // GCE cannot test a specific IP because the test may not own it. This cloud specific condition // will be removed when GCP supports similar functionality. if framework.ProviderIs("azure") { ginkgo.By("switching back to interal type LoadBalancer, with static IP specified.") // For a cluster created with CAPZ, node-subnet may not be "10.240.0.0/16", e.g. "10.1.0.0/16". base := netutils.BigForIP(subnetPrefix.IP) offset := big.NewInt(0).SetBytes(netutils.ParseIPSloppy("0.0.11.11").To4()).Int64() internalStaticIP := netutils.AddIPOffset(base, int(offset)).String() svc, err = jig.UpdateService(ctx, func(svc *v1.Service) { svc.Spec.LoadBalancerIP = internalStaticIP enableILB(svc) }) framework.ExpectNoError(err) framework.Logf("Waiting up to %v for service %q to have an internal LoadBalancer", createTimeout, serviceName) if pollErr := wait.PollImmediate(pollInterval, createTimeout, func() (bool, error) { svc, err := cs.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{}) if err != nil { return false, err } lbIngress = &svc.Status.LoadBalancer.Ingress[0] return isInternalEndpoint(lbIngress), nil }); pollErr != nil { framework.Failf("Loadbalancer IP not changed to internal.") } // should have the given static internal IP. gomega.Expect(e2eservice.GetIngressPoint(lbIngress)).To(gomega.Equal(internalStaticIP)) } }) // [LinuxOnly]: Windows does not support session affinity. f.It("should have session affinity work for LoadBalancer service with ESIPP on", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) { // L4 load balancer affinity `ClientIP` is not supported on AWS ELB. e2eskipper.SkipIfProviderIs("aws") svc := getServeHostnameService("affinity-lb-esipp") svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal execAffinityTestForLBService(ctx, f, cs, svc) }) // [LinuxOnly]: Windows does not support session affinity. f.It("should be able to switch session affinity for LoadBalancer service with ESIPP on", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) { // L4 load balancer affinity `ClientIP` is not supported on AWS ELB. e2eskipper.SkipIfProviderIs("aws") svc := getServeHostnameService("affinity-lb-esipp-transition") svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal execAffinityTestForLBServiceWithTransition(ctx, f, cs, svc) }) // [LinuxOnly]: Windows does not support session affinity. f.It("should have session affinity work for LoadBalancer service with ESIPP off", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) { // L4 load balancer affinity `ClientIP` is not supported on AWS ELB. e2eskipper.SkipIfProviderIs("aws") svc := getServeHostnameService("affinity-lb") svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster execAffinityTestForLBService(ctx, f, cs, svc) }) // [LinuxOnly]: Windows does not support session affinity. f.It("should be able to switch session affinity for LoadBalancer service with ESIPP off", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) { // L4 load balancer affinity `ClientIP` is not supported on AWS ELB. e2eskipper.SkipIfProviderIs("aws") svc := getServeHostnameService("affinity-lb-transition") svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster execAffinityTestForLBServiceWithTransition(ctx, f, cs, svc) }) // This test verifies if service load balancer cleanup finalizer is properly // handled during service lifecycle. // 1. Create service with type=LoadBalancer. Finalizer should be added. // 2. Update service to type=ClusterIP. Finalizer should be removed. // 3. Update service to type=LoadBalancer. Finalizer should be added. // 4. Delete service with type=LoadBalancer. Finalizer should be removed. f.It("should handle load balancer cleanup finalizer for service", f.WithSlow(), func(ctx context.Context) { jig := e2eservice.NewTestJig(cs, f.Namespace.Name, "lb-finalizer") ginkgo.By("Create load balancer service") svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer }) framework.ExpectNoError(err) ginkgo.DeferCleanup(func(ctx context.Context) { ginkgo.By("Check that service can be deleted with finalizer") e2eservice.WaitForServiceDeletedWithFinalizer(ctx, cs, svc.Namespace, svc.Name) }) ginkgo.By("Wait for load balancer to serve traffic") svc, err = jig.WaitForLoadBalancer(ctx, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)) framework.ExpectNoError(err) ginkgo.By("Check if finalizer presents on service with type=LoadBalancer") e2eservice.WaitForServiceUpdatedWithFinalizer(ctx, cs, svc.Namespace, svc.Name, true) ginkgo.By("Check if finalizer is removed on service after changed to type=ClusterIP") err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)) framework.ExpectNoError(err) e2eservice.WaitForServiceUpdatedWithFinalizer(ctx, cs, svc.Namespace, svc.Name, false) ginkgo.By("Check if finalizer is added back to service after changed to type=LoadBalancer") err = jig.ChangeServiceType(ctx, v1.ServiceTypeLoadBalancer, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)) framework.ExpectNoError(err) e2eservice.WaitForServiceUpdatedWithFinalizer(ctx, cs, svc.Namespace, svc.Name, true) }) f.It("should be able to create LoadBalancer Service without NodePort and change it", f.WithSlow(), func(ctx context.Context) { // requires cloud load-balancer support e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws") loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault if framework.ProviderIs("aws") { loadBalancerLagTimeout = e2eservice.LoadBalancerLagTimeoutAWS } loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs) // This test is more monolithic than we'd like because LB turnup can be // very slow, so we lumped all the tests into one LB lifecycle. serviceName := "reallocate-nodeport-test" ns1 := f.Namespace.Name // LB1 in ns1 on TCP framework.Logf("namespace for TCP test: %s", ns1) ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns1) tcpJig := e2eservice.NewTestJig(cs, ns1, serviceName) tcpService, err := tcpJig.CreateTCPService(ctx, nil) framework.ExpectNoError(err) svcPort := int(tcpService.Spec.Ports[0].Port) framework.Logf("service port TCP: %d", svcPort) ginkgo.By("creating a pod to be part of the TCP service " + serviceName) _, err = tcpJig.Run(ctx, nil) framework.ExpectNoError(err) // Change the services to LoadBalancer. // Here we test that LoadBalancers can receive static IP addresses. This isn't // necessary, but is an additional feature this monolithic test checks. requestedIP := "" staticIPName := "" if framework.ProviderIs("gce", "gke") { ginkgo.By("creating a static load balancer IP") staticIPName = fmt.Sprintf("e2e-external-lb-test-%s", framework.RunID) gceCloud, err := gce.GetGCECloud() framework.ExpectNoError(err, "failed to get GCE cloud provider") err = gceCloud.ReserveRegionAddress(&compute.Address{Name: staticIPName}, gceCloud.Region()) ginkgo.DeferCleanup(func(ctx context.Context) { if staticIPName != "" { // Release GCE static IP - this is not kube-managed and will not be automatically released. if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil { framework.Logf("failed to release static IP %s: %v", staticIPName, err) } } }) framework.ExpectNoError(err, "failed to create region address: %s", staticIPName) reservedAddr, err := gceCloud.GetRegionAddress(staticIPName, gceCloud.Region()) framework.ExpectNoError(err, "failed to get region address: %s", staticIPName) requestedIP = reservedAddr.Address framework.Logf("Allocated static load balancer IP: %s", requestedIP) } ginkgo.By("changing the TCP service to type=LoadBalancer") _, err = tcpJig.UpdateService(ctx, func(s *v1.Service) { s.Spec.LoadBalancerIP = requestedIP // will be "" if not applicable s.Spec.Type = v1.ServiceTypeLoadBalancer s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(false) }) framework.ExpectNoError(err) ginkgo.By("waiting for the TCP service to have a load balancer") // Wait for the load balancer to be created asynchronously tcpService, err = tcpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout) framework.ExpectNoError(err) if int(tcpService.Spec.Ports[0].NodePort) != 0 { framework.Failf("TCP Spec.Ports[0].NodePort allocated %d when not expected", tcpService.Spec.Ports[0].NodePort) } if requestedIP != "" && e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != requestedIP { framework.Failf("unexpected TCP Status.LoadBalancer.Ingress (expected %s, got %s)", requestedIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])) } tcpIngressIP := e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) framework.Logf("TCP load balancer: %s", tcpIngressIP) if framework.ProviderIs("gce", "gke") { // Do this as early as possible, which overrides the `defer` above. // This is mostly out of fear of leaking the IP in a timeout case // (as of this writing we're not 100% sure where the leaks are // coming from, so this is first-aid rather than surgery). ginkgo.By("demoting the static IP to ephemeral") if staticIPName != "" { gceCloud, err := gce.GetGCECloud() framework.ExpectNoError(err, "failed to get GCE cloud provider") // Deleting it after it is attached "demotes" it to an // ephemeral IP, which can be auto-released. if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil { framework.Failf("failed to release static IP %s: %v", staticIPName, err) } staticIPName = "" } } ginkgo.By("hitting the TCP service's LoadBalancer") e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout) // Change the services' node ports. ginkgo.By("adding a TCP service's NodePort") tcpService, err = tcpJig.UpdateService(ctx, func(s *v1.Service) { s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true) }) framework.ExpectNoError(err) tcpNodePort := int(tcpService.Spec.Ports[0].NodePort) if tcpNodePort == 0 { framework.Failf("TCP Spec.Ports[0].NodePort (%d) not allocated", tcpNodePort) } if e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP { framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])) } framework.Logf("TCP node port: %d", tcpNodePort) ginkgo.By("hitting the TCP service's LoadBalancer") e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout) }) ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a LoadBalancer service on different nodes", func(ctx context.Context) { // requires cloud load-balancer support e2eskipper.SkipUnlessProviderIs("gce", "gke", "azure") ns := f.Namespace.Name nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2) framework.ExpectNoError(err) if len(nodes.Items) < 2 { e2eskipper.Skipf( "Test requires >= 2 Ready nodes, but there are only %v nodes", len(nodes.Items)) } loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs) // Create a LoadBalancer service udpJig := e2eservice.NewTestJig(cs, ns, serviceName) ginkgo.By("creating a UDP service " + serviceName + " with type=LoadBalancer in " + ns) _, err = udpJig.CreateUDPService(ctx, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.Ports = []v1.ServicePort{ {Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(80)}, } }) framework.ExpectNoError(err) var udpIngressIP string ginkgo.By("waiting for the UDP service to have a load balancer") udpService, err := udpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout) framework.ExpectNoError(err) udpIngressIP = e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) framework.Logf("UDP load balancer: %s", udpIngressIP) // keep hitting the loadbalancer to check it fails over to the second pod ginkgo.By("hitting the UDP service's LoadBalancer with same source port") stopCh := make(chan struct{}) defer close(stopCh) var mu sync.Mutex hostnames := sets.NewString() go func() { defer ginkgo.GinkgoRecover() port := int(udpService.Spec.Ports[0].Port) laddr, err := net.ResolveUDPAddr("udp", ":54321") if err != nil { framework.Failf("Failed to resolve local address: %v", err) } raddr := net.UDPAddr{IP: netutils.ParseIPSloppy(udpIngressIP), Port: port} for { select { case <-stopCh: if len(hostnames) != 2 { framework.Failf("Failed to hit the 2 UDP LoadBalancer backends successfully, got %v", hostnames.List()) } return default: time.Sleep(1 * time.Second) } conn, err := net.DialUDP("udp", laddr, &raddr) if err != nil { framework.Logf("Failed to connect to: %s %d", udpIngressIP, port) continue } conn.SetDeadline(time.Now().Add(3 * time.Second)) framework.Logf("Connected successfully to: %s", raddr.String()) conn.Write([]byte("hostname\n")) buff := make([]byte, 1024) n, _, err := conn.ReadFrom(buff) if err == nil { mu.Lock() hostnames.Insert(string(buff[:n])) mu.Unlock() framework.Logf("Connected successfully to hostname: %s", string(buff[:n])) } conn.Close() } }() // Add a backend pod to the service in one node ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName) serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80)) serverPod1.Labels = udpJig.Labels serverPod1.Spec.Hostname = "hostname1" nodeSelection := e2epod.NodeSelection{Name: nodes.Items[0].Name} e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection) e2epod.NewPodClient(f).CreateSync(ctx, serverPod1) validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend1: {80}}) // Note that the fact that Endpoints object already exists, does NOT mean // that iptables (or whatever else is used) was already programmed. // Additionally take into account that UDP conntract entries timeout is // 30 seconds by default. // Based on the above check if the pod receives the traffic. ginkgo.By("checking client pod connected to the backend 1 on Node " + nodes.Items[0].Name) if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) { mu.Lock() defer mu.Unlock() return hostnames.Has(serverPod1.Spec.Hostname), nil }); err != nil { framework.Failf("Failed to connect to backend 1") } // Create a second pod ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName) serverPod2 := e2epod.NewAgnhostPod(ns, podBackend2, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80)) serverPod2.Labels = udpJig.Labels serverPod2.Spec.Hostname = "hostname2" nodeSelection = e2epod.NodeSelection{Name: nodes.Items[1].Name} e2epod.SetNodeSelection(&serverPod2.Spec, nodeSelection) e2epod.NewPodClient(f).CreateSync(ctx, serverPod2) // and delete the first pod framework.Logf("Cleaning up %s pod", podBackend1) e2epod.NewPodClient(f).DeleteSync(ctx, podBackend1, metav1.DeleteOptions{}, e2epod.DefaultPodDeletionTimeout) validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend2: {80}}) // Check that the second pod keeps receiving traffic // UDP conntrack entries timeout is 30 sec by default ginkgo.By("checking client pod connected to the backend 2 on Node " + nodes.Items[1].Name) if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) { mu.Lock() defer mu.Unlock() return hostnames.Has(serverPod2.Spec.Hostname), nil }); err != nil { framework.Failf("Failed to connect to backend 2") } }) ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a LoadBalancer service on the same nodes", func(ctx context.Context) { // requires cloud load-balancer support e2eskipper.SkipUnlessProviderIs("gce", "gke", "azure") ns := f.Namespace.Name nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 1) framework.ExpectNoError(err) if len(nodes.Items) < 1 { e2eskipper.Skipf( "Test requires >= 1 Ready nodes, but there are only %d nodes", len(nodes.Items)) } loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs) // Create a LoadBalancer service udpJig := e2eservice.NewTestJig(cs, ns, serviceName) ginkgo.By("creating a UDP service " + serviceName + " with type=LoadBalancer in " + ns) _, err = udpJig.CreateUDPService(ctx, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer svc.Spec.Ports = []v1.ServicePort{ {Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(80)}, } }) framework.ExpectNoError(err) var udpIngressIP string ginkgo.By("waiting for the UDP service to have a load balancer") udpService, err := udpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout) framework.ExpectNoError(err) udpIngressIP = e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) framework.Logf("UDP load balancer: %s", udpIngressIP) // keep hitting the loadbalancer to check it fails over to the second pod ginkgo.By("hitting the UDP service's LoadBalancer with same source port") stopCh := make(chan struct{}) defer close(stopCh) var mu sync.Mutex hostnames := sets.NewString() go func() { defer ginkgo.GinkgoRecover() port := int(udpService.Spec.Ports[0].Port) laddr, err := net.ResolveUDPAddr("udp", ":54322") if err != nil { framework.Failf("Failed to resolve local address: %v", err) } raddr := net.UDPAddr{IP: netutils.ParseIPSloppy(udpIngressIP), Port: port} for { select { case <-stopCh: if len(hostnames) != 2 { framework.Failf("Failed to hit the 2 UDP LoadBalancer backends successfully, got %v", hostnames.List()) } return default: time.Sleep(1 * time.Second) } conn, err := net.DialUDP("udp", laddr, &raddr) if err != nil { framework.Logf("Failed to connect to: %s %d", udpIngressIP, port) continue } conn.SetDeadline(time.Now().Add(3 * time.Second)) framework.Logf("Connected successfully to: %s", raddr.String()) conn.Write([]byte("hostname\n")) buff := make([]byte, 1024) n, _, err := conn.ReadFrom(buff) if err == nil { mu.Lock() hostnames.Insert(string(buff[:n])) mu.Unlock() framework.Logf("Connected successfully to hostname: %s", string(buff[:n])) } conn.Close() } }() // Add a backend pod to the service in one node ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName) serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80)) serverPod1.Labels = udpJig.Labels serverPod1.Spec.Hostname = "hostname1" nodeSelection := e2epod.NodeSelection{Name: nodes.Items[0].Name} e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection) e2epod.NewPodClient(f).CreateSync(ctx, serverPod1) validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend1: {80}}) // Note that the fact that Endpoints object already exists, does NOT mean // that iptables (or whatever else is used) was already programmed. // Additionally take into account that UDP conntract entries timeout is // 30 seconds by default. // Based on the above check if the pod receives the traffic. ginkgo.By("checking client pod connected to the backend 1 on Node " + nodes.Items[0].Name) if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) { mu.Lock() defer mu.Unlock() return hostnames.Has(serverPod1.Spec.Hostname), nil }); err != nil { framework.Failf("Failed to connect to backend 1") } // Create a second pod on the same node ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName) serverPod2 := e2epod.NewAgnhostPod(ns, podBackend2, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80)) serverPod2.Labels = udpJig.Labels serverPod2.Spec.Hostname = "hostname2" // use the same node as previous pod e2epod.SetNodeSelection(&serverPod2.Spec, nodeSelection) e2epod.NewPodClient(f).CreateSync(ctx, serverPod2) // and delete the first pod framework.Logf("Cleaning up %s pod", podBackend1) e2epod.NewPodClient(f).DeleteSync(ctx, podBackend1, metav1.DeleteOptions{}, e2epod.DefaultPodDeletionTimeout) validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend2: {80}}) // Check that the second pod keeps receiving traffic // UDP conntrack entries timeout is 30 sec by default ginkgo.By("checking client pod connected to the backend 2 on Node " + nodes.Items[0].Name) if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) { mu.Lock() defer mu.Unlock() return hostnames.Has(serverPod2.Spec.Hostname), nil }); err != nil { framework.Failf("Failed to connect to backend 2") } }) f.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Cluster", f.WithSlow(), func(ctx context.Context) { // We start with a low but reasonable threshold to analyze the results. // The goal is to achieve 99% minimum success rate. // TODO: We should do incremental steps toward the goal. minSuccessRate := 0.95 testRollingUpdateLBConnectivityDisruption(ctx, f, v1.ServiceExternalTrafficPolicyTypeCluster, minSuccessRate) }) f.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Local", f.WithSlow(), func(ctx context.Context) { // We start with a low but reasonable threshold to analyze the results. // The goal is to achieve 99% minimum success rate. // TODO: We should do incremental steps toward the goal. minSuccessRate := 0.95 testRollingUpdateLBConnectivityDisruption(ctx, f, v1.ServiceExternalTrafficPolicyTypeLocal, minSuccessRate) }) }) var _ = common.SIGDescribe("LoadBalancers ESIPP", framework.WithSlow(), func() { f := framework.NewDefaultFramework("esipp") f.NamespacePodSecurityLevel = admissionapi.LevelBaseline var loadBalancerCreateTimeout time.Duration var cs clientset.Interface var subnetPrefix *net.IPNet var err error ginkgo.BeforeEach(func(ctx context.Context) { // requires cloud load-balancer support - this feature currently supported only on GCE/GKE e2eskipper.SkipUnlessProviderIs("gce", "gke") cs = f.ClientSet loadBalancerCreateTimeout = e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs) subnetPrefix, err = getSubnetPrefix(ctx, cs) framework.ExpectNoError(err) }) ginkgo.AfterEach(func(ctx context.Context) { if ginkgo.CurrentSpecReport().Failed() { DescribeSvc(f.Namespace.Name) } }) ginkgo.It("should work for type=LoadBalancer", func(ctx context.Context) { namespace := f.Namespace.Name serviceName := "external-local-lb" jig := e2eservice.NewTestJig(cs, namespace, serviceName) svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil) framework.ExpectNoError(err) healthCheckNodePort := int(svc.Spec.HealthCheckNodePort) if healthCheckNodePort == 0 { framework.Failf("Service HealthCheck NodePort was not allocated") } ginkgo.DeferCleanup(func(ctx context.Context) { err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout) framework.ExpectNoError(err) // Make sure we didn't leak the health check node port. const threshold = 2 nodes, err := getEndpointNodesWithInternalIP(ctx, jig) framework.ExpectNoError(err) config := e2enetwork.NewNetworkingTestConfig(ctx, f) for _, internalIP := range nodes { err := testHTTPHealthCheckNodePortFromTestContainer(ctx, config, internalIP, healthCheckNodePort, e2eservice.KubeProxyLagTimeout, false, threshold) framework.ExpectNoError(err) } err = cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) framework.ExpectNoError(err) }) svcTCPPort := int(svc.Spec.Ports[0].Port) ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) ginkgo.By("reading clientIP using the TCP service's service port via its external VIP") clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, "/clientip") framework.ExpectNoError(err) framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIPPort) ginkgo.By("checking if Source IP is preserved") // The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port host, _, err := net.SplitHostPort(clientIPPort) if err != nil { framework.Failf("SplitHostPort returned unexpected error: %q", clientIPPort) } ip := netutils.ParseIPSloppy(host) if ip == nil { framework.Failf("Invalid client IP address format: %q", host) } if subnetPrefix.Contains(ip) { framework.Failf("Source IP was NOT preserved") } }) ginkgo.It("should work for type=NodePort", func(ctx context.Context) { namespace := f.Namespace.Name serviceName := "external-local-nodeport" jig := e2eservice.NewTestJig(cs, namespace, serviceName) svc, err := jig.CreateOnlyLocalNodePortService(ctx, true) framework.ExpectNoError(err) ginkgo.DeferCleanup(func(ctx context.Context) { err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) framework.ExpectNoError(err) }) tcpNodePort := int(svc.Spec.Ports[0].NodePort) endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig) framework.ExpectNoError(err) dialCmd := "clientip" config := e2enetwork.NewNetworkingTestConfig(ctx, f) for nodeName, nodeIP := range endpointsNodeMap { ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v:%v/%v", nodeName, nodeIP, tcpNodePort, dialCmd)) clientIP, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, dialCmd) framework.ExpectNoError(err) framework.Logf("ClientIP detected by target pod using NodePort is %s, the ip of test container is %s", clientIP, config.TestContainerPod.Status.PodIP) // the clientIP returned by agnhost contains port if !strings.HasPrefix(clientIP, config.TestContainerPod.Status.PodIP) { framework.Failf("Source IP was NOT preserved") } } }) ginkgo.It("should only target nodes with endpoints", func(ctx context.Context) { namespace := f.Namespace.Name serviceName := "external-local-nodes" jig := e2eservice.NewTestJig(cs, namespace, serviceName) nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests) framework.ExpectNoError(err) svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, false, func(svc *v1.Service) { // Change service port to avoid collision with opened hostPorts // in other tests that run in parallel. if len(svc.Spec.Ports) != 0 { svc.Spec.Ports[0].TargetPort = intstr.FromInt32(svc.Spec.Ports[0].Port) svc.Spec.Ports[0].Port = 8081 } }) framework.ExpectNoError(err) ginkgo.DeferCleanup(func(ctx context.Context) { err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout) framework.ExpectNoError(err) err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) framework.ExpectNoError(err) }) healthCheckNodePort := int(svc.Spec.HealthCheckNodePort) if healthCheckNodePort == 0 { framework.Failf("Service HealthCheck NodePort was not allocated") } ips := e2enode.CollectAddresses(nodes, v1.NodeInternalIP) ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) svcTCPPort := int(svc.Spec.Ports[0].Port) const threshold = 2 config := e2enetwork.NewNetworkingTestConfig(ctx, f) for i := 0; i < len(nodes.Items); i++ { endpointNodeName := nodes.Items[i].Name ginkgo.By("creating a pod to be part of the service " + serviceName + " on node " + endpointNodeName) _, err = jig.Run(ctx, func(rc *v1.ReplicationController) { rc.Name = serviceName if endpointNodeName != "" { rc.Spec.Template.Spec.NodeName = endpointNodeName } }) framework.ExpectNoError(err) ginkgo.By(fmt.Sprintf("waiting for service endpoint on node %v", endpointNodeName)) err = jig.WaitForEndpointOnNode(ctx, endpointNodeName) framework.ExpectNoError(err) // HealthCheck should pass only on the node where num(endpoints) > 0 // All other nodes should fail the healthcheck on the service healthCheckNodePort for n, internalIP := range ips { // Make sure the loadbalancer picked up the health check change. // Confirm traffic can reach backend through LB before checking healthcheck nodeport. e2eservice.TestReachableHTTP(ctx, ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout) expectedSuccess := nodes.Items[n].Name == endpointNodeName port := strconv.Itoa(healthCheckNodePort) ipPort := net.JoinHostPort(internalIP, port) framework.Logf("Health checking %s, http://%s/healthz, expectedSuccess %v", nodes.Items[n].Name, ipPort, expectedSuccess) err := testHTTPHealthCheckNodePortFromTestContainer(ctx, config, internalIP, healthCheckNodePort, e2eservice.KubeProxyEndpointLagTimeout, expectedSuccess, threshold) framework.ExpectNoError(err) } framework.ExpectNoError(e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, namespace, serviceName)) } }) ginkgo.It("should work from pods", func(ctx context.Context) { var err error namespace := f.Namespace.Name serviceName := "external-local-pods" jig := e2eservice.NewTestJig(cs, namespace, serviceName) svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil) framework.ExpectNoError(err) ginkgo.DeferCleanup(func(ctx context.Context) { err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout) framework.ExpectNoError(err) err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) framework.ExpectNoError(err) }) ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) port := strconv.Itoa(int(svc.Spec.Ports[0].Port)) ipPort := net.JoinHostPort(ingressIP, port) path := fmt.Sprintf("%s/clientip", ipPort) ginkgo.By("Creating pause pod deployment to make sure, pausePods are in desired state") deployment := createPausePodDeployment(ctx, cs, "pause-pod-deployment", namespace, 1) framework.ExpectNoError(e2edeployment.WaitForDeploymentComplete(cs, deployment), "Failed to complete pause pod deployment") ginkgo.DeferCleanup(func(ctx context.Context) { framework.Logf("Deleting deployment") err = cs.AppsV1().Deployments(namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{}) framework.ExpectNoError(err, "Failed to delete deployment %s", deployment.Name) }) deployment, err = cs.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) framework.ExpectNoError(err, "Error in retrieving pause pod deployment") labelSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) framework.ExpectNoError(err, "Error in setting LabelSelector as selector from deployment") pausePods, err := cs.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector.String()}) framework.ExpectNoError(err, "Error in listing pods associated with pause pod deployments") pausePod := pausePods.Items[0] framework.Logf("Waiting up to %v curl %v", e2eservice.KubeProxyLagTimeout, path) cmd := fmt.Sprintf(`curl -q -s --connect-timeout 30 %v`, path) var srcIP string loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs) ginkgo.By(fmt.Sprintf("Hitting external lb %v from pod %v on node %v", ingressIP, pausePod.Name, pausePod.Spec.NodeName)) if pollErr := wait.PollImmediate(framework.Poll, loadBalancerPropagationTimeout, func() (bool, error) { stdout, err := e2eoutput.RunHostCmd(pausePod.Namespace, pausePod.Name, cmd) if err != nil { framework.Logf("got err: %v, retry until timeout", err) return false, nil } srcIP = strings.TrimSpace(strings.Split(stdout, ":")[0]) return srcIP == pausePod.Status.PodIP, nil }); pollErr != nil { framework.Failf("Source IP not preserved from %v, expected '%v' got '%v'", pausePod.Name, pausePod.Status.PodIP, srcIP) } }) ginkgo.It("should handle updates to ExternalTrafficPolicy field", func(ctx context.Context) { namespace := f.Namespace.Name serviceName := "external-local-update" jig := e2eservice.NewTestJig(cs, namespace, serviceName) nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests) framework.ExpectNoError(err) if len(nodes.Items) < 2 { framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint") } svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil) framework.ExpectNoError(err) ginkgo.DeferCleanup(func(ctx context.Context) { err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout) framework.ExpectNoError(err) err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) framework.ExpectNoError(err) }) // save the health check node port because it disappears when ESIPP is turned off. healthCheckNodePort := int(svc.Spec.HealthCheckNodePort) ginkgo.By("turning ESIPP off") svc, err = jig.UpdateService(ctx, func(svc *v1.Service) { svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster }) framework.ExpectNoError(err) if svc.Spec.HealthCheckNodePort > 0 { framework.Failf("Service HealthCheck NodePort still present") } epNodes, err := jig.ListNodesWithEndpoint(ctx) framework.ExpectNoError(err) // map from name of nodes with endpoint to internal ip // it is assumed that there is only a single node with the endpoint endpointNodeMap := make(map[string]string) // map from name of nodes without endpoint to internal ip noEndpointNodeMap := make(map[string]string) for _, node := range epNodes { ips := e2enode.GetAddresses(&node, v1.NodeInternalIP) if len(ips) < 1 { framework.Failf("No internal ip found for node %s", node.Name) } endpointNodeMap[node.Name] = ips[0] } for _, n := range nodes.Items { ips := e2enode.GetAddresses(&n, v1.NodeInternalIP) if len(ips) < 1 { framework.Failf("No internal ip found for node %s", n.Name) } if _, ok := endpointNodeMap[n.Name]; !ok { noEndpointNodeMap[n.Name] = ips[0] } } gomega.Expect(endpointNodeMap).ToNot(gomega.BeEmpty()) gomega.Expect(noEndpointNodeMap).ToNot(gomega.BeEmpty()) svcTCPPort := int(svc.Spec.Ports[0].Port) svcNodePort := int(svc.Spec.Ports[0].NodePort) ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) path := "/clientip" dialCmd := "clientip" config := e2enetwork.NewNetworkingTestConfig(ctx, f) ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap)) for nodeName, nodeIP := range noEndpointNodeMap { ginkgo.By(fmt.Sprintf("Checking %v (%v:%v/%v) proxies to endpoints on another node", nodeName, nodeIP[0], svcNodePort, dialCmd)) _, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, svcNodePort, e2eservice.KubeProxyLagTimeout, dialCmd) framework.ExpectNoError(err, "Could not reach HTTP service through %v:%v/%v after %v", nodeIP, svcNodePort, dialCmd, e2eservice.KubeProxyLagTimeout) } for nodeName, nodeIP := range endpointNodeMap { ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIP)) var body string pollFn := func() (bool, error) { // we expect connection failure here, but not other errors resp, err := config.GetResponseFromTestContainer(ctx, "http", "healthz", nodeIP, healthCheckNodePort) if err != nil { return false, nil } if len(resp.Errors) > 0 { return true, nil } if len(resp.Responses) > 0 { body = resp.Responses[0] } return false, nil } if pollErr := wait.PollImmediate(framework.Poll, e2eservice.TestTimeout, pollFn); pollErr != nil { framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. body %s", nodeName, healthCheckNodePort, body) } } // Poll till kube-proxy re-adds the MASQUERADE rule on the node. ginkgo.By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP)) var clientIP string pollErr := wait.PollImmediate(framework.Poll, 3*e2eservice.KubeProxyLagTimeout, func() (bool, error) { clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path) if err != nil { return false, nil } // The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port host, _, err := net.SplitHostPort(clientIPPort) if err != nil { framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort) return false, nil } ip := netutils.ParseIPSloppy(host) if ip == nil { framework.Logf("Invalid client IP address format: %q", host) return false, nil } if subnetPrefix.Contains(ip) { return true, nil } return false, nil }) if pollErr != nil { framework.Failf("Source IP WAS preserved even after ESIPP turned off. Got %v, expected a ten-dot cluster ip.", clientIP) } // TODO: We need to attempt to create another service with the previously // allocated healthcheck nodePort. If the health check nodePort has been // freed, the new service creation will succeed, upon which we cleanup. // If the health check nodePort has NOT been freed, the new service // creation will fail. ginkgo.By("setting ExternalTraffic field back to OnlyLocal") svc, err = jig.UpdateService(ctx, func(svc *v1.Service) { svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal // Request the same healthCheckNodePort as before, to test the user-requested allocation path svc.Spec.HealthCheckNodePort = int32(healthCheckNodePort) }) framework.ExpectNoError(err) loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs) pollErr = wait.PollImmediate(framework.PollShortTimeout, loadBalancerPropagationTimeout, func() (bool, error) { clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path) if err != nil { return false, nil } ginkgo.By(fmt.Sprintf("Endpoint %v:%v%v returned client ip %v", ingressIP, svcTCPPort, path, clientIPPort)) // The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port host, _, err := net.SplitHostPort(clientIPPort) if err != nil { framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort) return false, nil } ip := netutils.ParseIPSloppy(host) if ip == nil { framework.Logf("Invalid client IP address format: %q", host) return false, nil } if !subnetPrefix.Contains(ip) { return true, nil } return false, nil }) if pollErr != nil { framework.Failf("Source IP (%v) is not the client IP even after ESIPP turned on, expected a public IP.", clientIP) } }) }) func testRollingUpdateLBConnectivityDisruption(ctx context.Context, f *framework.Framework, externalTrafficPolicy v1.ServiceExternalTrafficPolicyType, minSuccessRate float64) { cs := f.ClientSet ns := f.Namespace.Name name := "test-lb-rolling-update" labels := map[string]string{"name": name} gracePeriod := int64(60) maxUnavailable := intstr.FromString("10%") ds := e2edaemonset.NewDaemonSet(name, e2eapps.AgnhostImage, labels, nil, nil, []v1.ContainerPort{ {ContainerPort: 80}, }, "netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod), ) ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{ Type: appsv1.RollingUpdateDaemonSetStrategyType, RollingUpdate: &appsv1.RollingUpdateDaemonSet{ MaxUnavailable: &maxUnavailable, }, } ds.Spec.Template.Labels = labels ds.Spec.Template.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) nodeNames := e2edaemonset.SchedulableNodes(ctx, cs, ds) e2eskipper.SkipUnlessAtLeast(len(nodeNames), 2, "load-balancer rolling update test requires at least 2 schedulable nodes for the DaemonSet") if len(nodeNames) > 25 { e2eskipper.Skipf("load-balancer rolling update test skipped for large environments with more than 25 nodes") } ginkgo.By(fmt.Sprintf("Creating DaemonSet %q", name)) ds, err := cs.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Checking that daemon pods launch on every schedulable node of the cluster") creationTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs) err = wait.PollUntilContextTimeout(ctx, framework.Poll, creationTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, nodeNames)) framework.ExpectNoError(err, "error waiting for daemon pods to start") err = e2edaemonset.CheckDaemonStatus(ctx, f, name) framework.ExpectNoError(err) ginkgo.By(fmt.Sprintf("Creating a service %s with type=LoadBalancer externalTrafficPolicy=%s in namespace %s", name, externalTrafficPolicy, ns)) jig := e2eservice.NewTestJig(cs, ns, name) jig.Labels = labels service, err := jig.CreateLoadBalancerService(ctx, creationTimeout, func(svc *v1.Service) { svc.Spec.ExternalTrafficPolicy = externalTrafficPolicy }) framework.ExpectNoError(err) lbNameOrAddress := e2eservice.GetIngressPoint(&service.Status.LoadBalancer.Ingress[0]) svcPort := int(service.Spec.Ports[0].Port) ginkgo.By("Hitting the DaemonSet's pods through the service's load balancer") timeout := e2eservice.LoadBalancerLagTimeoutDefault if framework.ProviderIs("aws") { timeout = e2eservice.LoadBalancerLagTimeoutAWS } e2eservice.TestReachableHTTP(ctx, lbNameOrAddress, svcPort, timeout) ginkgo.By("Starting a goroutine to continuously hit the DaemonSet's pods through the service's load balancer") var totalRequests uint64 = 0 var networkErrors uint64 = 0 var httpErrors uint64 = 0 done := make(chan struct{}) defer close(done) go func() { defer ginkgo.GinkgoRecover() wait.Until(func() { atomic.AddUint64(&totalRequests, 1) client := &http.Client{ Transport: utilnet.SetTransportDefaults(&http.Transport{ DisableKeepAlives: true, }), Timeout: 5 * time.Second, } ipPort := net.JoinHostPort(lbNameOrAddress, strconv.Itoa(svcPort)) msg := "hello" url := fmt.Sprintf("http://%s/echo?msg=%s", ipPort, msg) resp, err := client.Get(url) if err != nil { framework.Logf("Got error testing for reachability of %s: %v", url, err) atomic.AddUint64(&networkErrors, 1) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { framework.Logf("Got bad status code: %d", resp.StatusCode) atomic.AddUint64(&httpErrors, 1) return } body, err := io.ReadAll(resp.Body) if err != nil { framework.Logf("Got error reading HTTP body: %v", err) atomic.AddUint64(&httpErrors, 1) return } if string(body) != msg { framework.Logf("The response body does not contain expected string %s", string(body)) atomic.AddUint64(&httpErrors, 1) return } }, time.Duration(0), done) }() ginkgo.By("Triggering DaemonSet rolling update several times") var previousTotalRequests uint64 = 0 var previousNetworkErrors uint64 = 0 var previousHttpErrors uint64 = 0 for i := 1; i <= 5; i++ { framework.Logf("Update daemon pods environment: [{\"name\":\"VERSION\",\"value\":\"%d\"}]", i) patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%d"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, i) ds, err = cs.AppsV1().DaemonSets(ns).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) framework.ExpectNoError(err) framework.Logf("Check that daemon pods are available on every node of the cluster with the updated environment.") err = wait.PollImmediate(framework.Poll, creationTimeout, func() (bool, error) { podList, err := cs.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{}) if err != nil { return false, err } pods := podList.Items readyPods := 0 for _, pod := range pods { if !metav1.IsControlledBy(&pod, ds) { continue } if pod.DeletionTimestamp != nil { continue } podVersion := "" for _, env := range pod.Spec.Containers[0].Env { if env.Name == "VERSION" { podVersion = env.Value break } } if podVersion != fmt.Sprintf("%d", i) { continue } podReady := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) if !podReady { continue } readyPods += 1 } framework.Logf("Number of running nodes: %d, number of updated ready pods: %d in daemonset %s", len(nodeNames), readyPods, ds.Name) return readyPods == len(nodeNames), nil }) framework.ExpectNoError(err, "error waiting for daemon pods to be ready") // assert that the HTTP requests success rate is above the acceptable threshold after this rolling update currentTotalRequests := atomic.LoadUint64(&totalRequests) currentNetworkErrors := atomic.LoadUint64(&networkErrors) currentHttpErrors := atomic.LoadUint64(&httpErrors) partialTotalRequests := currentTotalRequests - previousTotalRequests partialNetworkErrors := currentNetworkErrors - previousNetworkErrors partialHttpErrors := currentHttpErrors - previousHttpErrors partialSuccessRate := (float64(partialTotalRequests) - float64(partialNetworkErrors+partialHttpErrors)) / float64(partialTotalRequests) framework.Logf("Load Balancer total HTTP requests: %d", partialTotalRequests) framework.Logf("Network errors: %d", partialNetworkErrors) framework.Logf("HTTP errors: %d", partialHttpErrors) framework.Logf("Success rate: %.2f%%", partialSuccessRate*100) if partialSuccessRate < minSuccessRate { framework.Failf("Encountered too many errors when doing HTTP requests to the load balancer address. Success rate is %.2f%%, and the minimum allowed threshold is %.2f%%.", partialSuccessRate*100, minSuccessRate*100) } previousTotalRequests = currentTotalRequests previousNetworkErrors = currentNetworkErrors previousHttpErrors = currentHttpErrors } // assert that the load balancer address is still reachable after the rolling updates are finished e2eservice.TestReachableHTTP(ctx, lbNameOrAddress, svcPort, timeout) }