...

Source file src/k8s.io/kubernetes/test/e2e/network/loadbalancer.go

Documentation: k8s.io/kubernetes/test/e2e/network

     1  //go:build !providerless
     2  // +build !providerless
     3  
     4  /*
     5  Copyright 2016 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package network
    21  
    22  import (
    23  	"context"
    24  	"fmt"
    25  	"io"
    26  	"math/big"
    27  	"net"
    28  	"net/http"
    29  	"strconv"
    30  	"strings"
    31  	"sync"
    32  	"sync/atomic"
    33  	"time"
    34  
    35  	compute "google.golang.org/api/compute/v1"
    36  
    37  	appsv1 "k8s.io/api/apps/v1"
    38  	v1 "k8s.io/api/core/v1"
    39  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    40  	"k8s.io/apimachinery/pkg/types"
    41  	"k8s.io/apimachinery/pkg/util/intstr"
    42  	utilnet "k8s.io/apimachinery/pkg/util/net"
    43  	"k8s.io/apimachinery/pkg/util/sets"
    44  	"k8s.io/apimachinery/pkg/util/wait"
    45  	clientset "k8s.io/client-go/kubernetes"
    46  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    47  	e2eapps "k8s.io/kubernetes/test/e2e/apps"
    48  	"k8s.io/kubernetes/test/e2e/framework"
    49  	e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
    50  	e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
    51  	e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
    52  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    53  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    54  	e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
    55  	"k8s.io/kubernetes/test/e2e/framework/providers/gce"
    56  	e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
    57  	e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
    58  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    59  	"k8s.io/kubernetes/test/e2e/network/common"
    60  	admissionapi "k8s.io/pod-security-admission/api"
    61  	netutils "k8s.io/utils/net"
    62  	utilpointer "k8s.io/utils/pointer"
    63  
    64  	"github.com/onsi/ginkgo/v2"
    65  	"github.com/onsi/gomega"
    66  )
    67  
    68  // getInternalIP returns node internal IP
    69  func getInternalIP(node *v1.Node) (string, error) {
    70  	for _, address := range node.Status.Addresses {
    71  		if address.Type == v1.NodeInternalIP && address.Address != "" {
    72  			return address.Address, nil
    73  		}
    74  	}
    75  	return "", fmt.Errorf("couldn't get the internal IP of host %s with addresses %v", node.Name, node.Status.Addresses)
    76  }
    77  
    78  // getSubnetPrefix returns a network prefix based on one of the workers
    79  // InternalIP adding a /16 or /64 mask depending on the IP family of the node.
    80  // IMPORTANT: These assumes a flat network assigned to the nodes, that is common
    81  // on cloud providers.
    82  func getSubnetPrefix(ctx context.Context, c clientset.Interface) (*net.IPNet, error) {
    83  	node, err := getReadySchedulableWorkerNode(ctx, c)
    84  	if err != nil {
    85  		return nil, fmt.Errorf("error getting a ready schedulable worker Node, err: %w", err)
    86  	}
    87  	internalIP, err := getInternalIP(node)
    88  	if err != nil {
    89  		return nil, fmt.Errorf("error getting Node internal IP, err: %w", err)
    90  	}
    91  	ip := netutils.ParseIPSloppy(internalIP)
    92  	if ip == nil {
    93  		return nil, fmt.Errorf("invalid IP address format: %s", internalIP)
    94  	}
    95  
    96  	// if IPv6 return a net.IPNet with IP = ip and mask /64
    97  	ciderMask := net.CIDRMask(64, 128)
    98  	// if IPv4 return a net.IPNet with IP = ip and mask /16
    99  	if netutils.IsIPv4(ip) {
   100  		ciderMask = net.CIDRMask(16, 32)
   101  	}
   102  	return &net.IPNet{IP: ip.Mask(ciderMask), Mask: ciderMask}, nil
   103  }
   104  
   105  // getReadySchedulableWorkerNode gets a single worker node which is available for
   106  // running pods on. If there are no such available nodes it will return an error.
   107  func getReadySchedulableWorkerNode(ctx context.Context, c clientset.Interface) (*v1.Node, error) {
   108  	nodes, err := e2enode.GetReadySchedulableNodes(ctx, c)
   109  	if err != nil {
   110  		return nil, err
   111  	}
   112  	for i := range nodes.Items {
   113  		node := nodes.Items[i]
   114  		_, isMaster := node.Labels["node-role.kubernetes.io/master"]
   115  		_, isControlPlane := node.Labels["node-role.kubernetes.io/control-plane"]
   116  		if !isMaster && !isControlPlane {
   117  			return &node, nil
   118  		}
   119  	}
   120  	return nil, fmt.Errorf("there are currently no ready, schedulable worker nodes in the cluster")
   121  }
   122  
   123  var _ = common.SIGDescribe("LoadBalancers", func() {
   124  	f := framework.NewDefaultFramework("loadbalancers")
   125  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
   126  
   127  	var cs clientset.Interface
   128  	var subnetPrefix *net.IPNet
   129  	var err error
   130  
   131  	ginkgo.BeforeEach(func(ctx context.Context) {
   132  		cs = f.ClientSet
   133  		subnetPrefix, err = getSubnetPrefix(ctx, cs)
   134  		framework.ExpectNoError(err)
   135  	})
   136  
   137  	ginkgo.AfterEach(func(ctx context.Context) {
   138  		if ginkgo.CurrentSpecReport().Failed() {
   139  			DescribeSvc(f.Namespace.Name)
   140  		}
   141  	})
   142  
   143  	f.It("should be able to change the type and ports of a TCP service", f.WithSlow(), func(ctx context.Context) {
   144  		// requires cloud load-balancer support
   145  		e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws")
   146  
   147  		loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault
   148  		if framework.ProviderIs("aws") {
   149  			loadBalancerLagTimeout = e2eservice.LoadBalancerLagTimeoutAWS
   150  		}
   151  		loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
   152  
   153  		// This test is more monolithic than we'd like because LB turnup can be
   154  		// very slow, so we lumped all the tests into one LB lifecycle.
   155  
   156  		serviceName := "mutability-test"
   157  		ns1 := f.Namespace.Name // LB1 in ns1 on TCP
   158  		framework.Logf("namespace for TCP test: %s", ns1)
   159  
   160  		ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns1)
   161  		tcpJig := e2eservice.NewTestJig(cs, ns1, serviceName)
   162  		tcpService, err := tcpJig.CreateTCPService(ctx, nil)
   163  		framework.ExpectNoError(err)
   164  
   165  		svcPort := int(tcpService.Spec.Ports[0].Port)
   166  		framework.Logf("service port TCP: %d", svcPort)
   167  
   168  		ginkgo.By("creating a pod to be part of the TCP service " + serviceName)
   169  		_, err = tcpJig.Run(ctx, nil)
   170  		framework.ExpectNoError(err)
   171  
   172  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns1, "execpod", nil)
   173  		err = tcpJig.CheckServiceReachability(ctx, tcpService, execPod)
   174  		framework.ExpectNoError(err)
   175  
   176  		// Change the services to NodePort.
   177  
   178  		ginkgo.By("changing the TCP service to type=NodePort")
   179  		tcpService, err = tcpJig.UpdateService(ctx, func(s *v1.Service) {
   180  			s.Spec.Type = v1.ServiceTypeNodePort
   181  		})
   182  		framework.ExpectNoError(err)
   183  		tcpNodePort := int(tcpService.Spec.Ports[0].NodePort)
   184  		framework.Logf("TCP node port: %d", tcpNodePort)
   185  
   186  		err = tcpJig.CheckServiceReachability(ctx, tcpService, execPod)
   187  		framework.ExpectNoError(err)
   188  
   189  		// Change the services to LoadBalancer.
   190  
   191  		// Here we test that LoadBalancers can receive static IP addresses.  This isn't
   192  		// necessary, but is an additional feature this monolithic test checks.
   193  		requestedIP := ""
   194  		staticIPName := ""
   195  		if framework.ProviderIs("gce", "gke") {
   196  			ginkgo.By("creating a static load balancer IP")
   197  			staticIPName = fmt.Sprintf("e2e-external-lb-test-%s", framework.RunID)
   198  			gceCloud, err := gce.GetGCECloud()
   199  			framework.ExpectNoError(err, "failed to get GCE cloud provider")
   200  
   201  			err = gceCloud.ReserveRegionAddress(&compute.Address{Name: staticIPName}, gceCloud.Region())
   202  			defer func() {
   203  				if staticIPName != "" {
   204  					// Release GCE static IP - this is not kube-managed and will not be automatically released.
   205  					if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
   206  						framework.Logf("failed to release static IP %s: %v", staticIPName, err)
   207  					}
   208  				}
   209  			}()
   210  			framework.ExpectNoError(err, "failed to create region address: %s", staticIPName)
   211  			reservedAddr, err := gceCloud.GetRegionAddress(staticIPName, gceCloud.Region())
   212  			framework.ExpectNoError(err, "failed to get region address: %s", staticIPName)
   213  
   214  			requestedIP = reservedAddr.Address
   215  			framework.Logf("Allocated static load balancer IP: %s", requestedIP)
   216  		}
   217  
   218  		ginkgo.By("changing the TCP service to type=LoadBalancer")
   219  		_, err = tcpJig.UpdateService(ctx, func(s *v1.Service) {
   220  			s.Spec.LoadBalancerIP = requestedIP // will be "" if not applicable
   221  			s.Spec.Type = v1.ServiceTypeLoadBalancer
   222  		})
   223  		framework.ExpectNoError(err)
   224  
   225  		ginkgo.By("waiting for the TCP service to have a load balancer")
   226  		// Wait for the load balancer to be created asynchronously
   227  		tcpService, err = tcpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
   228  		framework.ExpectNoError(err)
   229  		if int(tcpService.Spec.Ports[0].NodePort) != tcpNodePort {
   230  			framework.Failf("TCP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", tcpNodePort, tcpService.Spec.Ports[0].NodePort)
   231  		}
   232  		if requestedIP != "" && e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != requestedIP {
   233  			framework.Failf("unexpected TCP Status.LoadBalancer.Ingress (expected %s, got %s)", requestedIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
   234  		}
   235  		tcpIngressIP := e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])
   236  		framework.Logf("TCP load balancer: %s", tcpIngressIP)
   237  
   238  		if framework.ProviderIs("gce", "gke") {
   239  			// Do this as early as possible, which overrides the `defer` above.
   240  			// This is mostly out of fear of leaking the IP in a timeout case
   241  			// (as of this writing we're not 100% sure where the leaks are
   242  			// coming from, so this is first-aid rather than surgery).
   243  			ginkgo.By("demoting the static IP to ephemeral")
   244  			if staticIPName != "" {
   245  				gceCloud, err := gce.GetGCECloud()
   246  				framework.ExpectNoError(err, "failed to get GCE cloud provider")
   247  				// Deleting it after it is attached "demotes" it to an
   248  				// ephemeral IP, which can be auto-released.
   249  				if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
   250  					framework.Failf("failed to release static IP %s: %v", staticIPName, err)
   251  				}
   252  				staticIPName = ""
   253  			}
   254  		}
   255  
   256  		err = tcpJig.CheckServiceReachability(ctx, tcpService, execPod)
   257  		framework.ExpectNoError(err)
   258  
   259  		ginkgo.By("hitting the TCP service's LoadBalancer")
   260  		e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout)
   261  
   262  		// Change the services' node ports.
   263  
   264  		ginkgo.By("changing the TCP service's NodePort")
   265  		tcpService, err = tcpJig.ChangeServiceNodePort(ctx, tcpNodePort)
   266  		framework.ExpectNoError(err)
   267  		tcpNodePortOld := tcpNodePort
   268  		tcpNodePort = int(tcpService.Spec.Ports[0].NodePort)
   269  		if tcpNodePort == tcpNodePortOld {
   270  			framework.Failf("TCP Spec.Ports[0].NodePort (%d) did not change", tcpNodePort)
   271  		}
   272  		if e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
   273  			framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
   274  		}
   275  		framework.Logf("TCP node port: %d", tcpNodePort)
   276  
   277  		ginkgo.By("hitting the TCP service's LoadBalancer")
   278  		e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout)
   279  
   280  		// Change the services' main ports.
   281  
   282  		ginkgo.By("changing the TCP service's port")
   283  		tcpService, err = tcpJig.UpdateService(ctx, func(s *v1.Service) {
   284  			s.Spec.Ports[0].Port++
   285  		})
   286  		framework.ExpectNoError(err)
   287  		svcPortOld := svcPort
   288  		svcPort = int(tcpService.Spec.Ports[0].Port)
   289  		if svcPort == svcPortOld {
   290  			framework.Failf("TCP Spec.Ports[0].Port (%d) did not change", svcPort)
   291  		}
   292  		if int(tcpService.Spec.Ports[0].NodePort) != tcpNodePort {
   293  			framework.Failf("TCP Spec.Ports[0].NodePort (%d) changed", tcpService.Spec.Ports[0].NodePort)
   294  		}
   295  		if e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
   296  			framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
   297  		}
   298  
   299  		framework.Logf("service port TCP: %d", svcPort)
   300  
   301  		ginkgo.By("hitting the TCP service's LoadBalancer")
   302  		e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerCreateTimeout)
   303  
   304  		ginkgo.By("Scaling the pods to 0")
   305  		err = tcpJig.Scale(ctx, 0)
   306  		framework.ExpectNoError(err)
   307  
   308  		ginkgo.By("looking for ICMP REJECT on the TCP service's LoadBalancer")
   309  		testRejectedHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout)
   310  
   311  		ginkgo.By("Scaling the pods to 1")
   312  		err = tcpJig.Scale(ctx, 1)
   313  		framework.ExpectNoError(err)
   314  
   315  		ginkgo.By("hitting the TCP service's LoadBalancer")
   316  		e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerCreateTimeout)
   317  
   318  		// Change the services back to ClusterIP.
   319  
   320  		ginkgo.By("changing TCP service back to type=ClusterIP")
   321  		tcpReadback, err := tcpJig.UpdateService(ctx, func(s *v1.Service) {
   322  			s.Spec.Type = v1.ServiceTypeClusterIP
   323  		})
   324  		framework.ExpectNoError(err)
   325  		if tcpReadback.Spec.Ports[0].NodePort != 0 {
   326  			framework.Fail("TCP Spec.Ports[0].NodePort was not cleared")
   327  		}
   328  		// Wait for the load balancer to be destroyed asynchronously
   329  		_, err = tcpJig.WaitForLoadBalancerDestroy(ctx, tcpIngressIP, svcPort, loadBalancerCreateTimeout)
   330  		framework.ExpectNoError(err)
   331  
   332  		ginkgo.By("checking the TCP LoadBalancer is closed")
   333  		testNotReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout)
   334  	})
   335  
   336  	f.It("should be able to change the type and ports of a UDP service", f.WithSlow(), func(ctx context.Context) {
   337  		// requires cloud load-balancer support
   338  		e2eskipper.SkipUnlessProviderIs("gce", "gke")
   339  
   340  		loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault
   341  		loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
   342  
   343  		// This test is more monolithic than we'd like because LB turnup can be
   344  		// very slow, so we lumped all the tests into one LB lifecycle.
   345  
   346  		serviceName := "mutability-test"
   347  		ns2 := f.Namespace.Name // LB1 in ns2 on TCP
   348  		framework.Logf("namespace for TCP test: %s", ns2)
   349  
   350  		ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in namespace " + ns2)
   351  		udpJig := e2eservice.NewTestJig(cs, ns2, serviceName)
   352  		udpService, err := udpJig.CreateUDPService(ctx, nil)
   353  		framework.ExpectNoError(err)
   354  
   355  		svcPort := int(udpService.Spec.Ports[0].Port)
   356  		framework.Logf("service port UDP: %d", svcPort)
   357  
   358  		ginkgo.By("creating a pod to be part of the UDP service " + serviceName)
   359  		_, err = udpJig.Run(ctx, nil)
   360  		framework.ExpectNoError(err)
   361  
   362  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns2, "execpod", nil)
   363  		err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
   364  		framework.ExpectNoError(err)
   365  
   366  		// Change the services to NodePort.
   367  
   368  		ginkgo.By("changing the UDP service to type=NodePort")
   369  		udpService, err = udpJig.UpdateService(ctx, func(s *v1.Service) {
   370  			s.Spec.Type = v1.ServiceTypeNodePort
   371  		})
   372  		framework.ExpectNoError(err)
   373  		udpNodePort := int(udpService.Spec.Ports[0].NodePort)
   374  		framework.Logf("UDP node port: %d", udpNodePort)
   375  
   376  		err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
   377  		framework.ExpectNoError(err)
   378  
   379  		// Change the services to LoadBalancer.
   380  
   381  		// Here we test that LoadBalancers can receive static IP addresses.  This isn't
   382  		// necessary, but is an additional feature this monolithic test checks.
   383  		requestedIP := ""
   384  		staticIPName := ""
   385  		ginkgo.By("creating a static load balancer IP")
   386  		staticIPName = fmt.Sprintf("e2e-external-lb-test-%s", framework.RunID)
   387  		gceCloud, err := gce.GetGCECloud()
   388  		framework.ExpectNoError(err, "failed to get GCE cloud provider")
   389  
   390  		err = gceCloud.ReserveRegionAddress(&compute.Address{Name: staticIPName}, gceCloud.Region())
   391  		defer func() {
   392  			if staticIPName != "" {
   393  				// Release GCE static IP - this is not kube-managed and will not be automatically released.
   394  				if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
   395  					framework.Logf("failed to release static IP %s: %v", staticIPName, err)
   396  				}
   397  			}
   398  		}()
   399  		framework.ExpectNoError(err, "failed to create region address: %s", staticIPName)
   400  		reservedAddr, err := gceCloud.GetRegionAddress(staticIPName, gceCloud.Region())
   401  		framework.ExpectNoError(err, "failed to get region address: %s", staticIPName)
   402  
   403  		requestedIP = reservedAddr.Address
   404  		framework.Logf("Allocated static load balancer IP: %s", requestedIP)
   405  
   406  		ginkgo.By("changing the UDP service to type=LoadBalancer")
   407  		_, err = udpJig.UpdateService(ctx, func(s *v1.Service) {
   408  			s.Spec.Type = v1.ServiceTypeLoadBalancer
   409  		})
   410  		framework.ExpectNoError(err)
   411  
   412  		// Do this as early as possible, which overrides the `defer` above.
   413  		// This is mostly out of fear of leaking the IP in a timeout case
   414  		// (as of this writing we're not 100% sure where the leaks are
   415  		// coming from, so this is first-aid rather than surgery).
   416  		ginkgo.By("demoting the static IP to ephemeral")
   417  		if staticIPName != "" {
   418  			gceCloud, err := gce.GetGCECloud()
   419  			framework.ExpectNoError(err, "failed to get GCE cloud provider")
   420  			// Deleting it after it is attached "demotes" it to an
   421  			// ephemeral IP, which can be auto-released.
   422  			if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
   423  				framework.Failf("failed to release static IP %s: %v", staticIPName, err)
   424  			}
   425  			staticIPName = ""
   426  		}
   427  
   428  		var udpIngressIP string
   429  		ginkgo.By("waiting for the UDP service to have a load balancer")
   430  		// 2nd one should be faster since they ran in parallel.
   431  		udpService, err = udpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
   432  		framework.ExpectNoError(err)
   433  		if int(udpService.Spec.Ports[0].NodePort) != udpNodePort {
   434  			framework.Failf("UDP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", udpNodePort, udpService.Spec.Ports[0].NodePort)
   435  		}
   436  		udpIngressIP = e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])
   437  		framework.Logf("UDP load balancer: %s", udpIngressIP)
   438  
   439  		err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
   440  		framework.ExpectNoError(err)
   441  
   442  		ginkgo.By("hitting the UDP service's LoadBalancer")
   443  		testReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
   444  
   445  		// Change the services' node ports.
   446  
   447  		ginkgo.By("changing the UDP service's NodePort")
   448  		udpService, err = udpJig.ChangeServiceNodePort(ctx, udpNodePort)
   449  		framework.ExpectNoError(err)
   450  		udpNodePortOld := udpNodePort
   451  		udpNodePort = int(udpService.Spec.Ports[0].NodePort)
   452  		if udpNodePort == udpNodePortOld {
   453  			framework.Failf("UDP Spec.Ports[0].NodePort (%d) did not change", udpNodePort)
   454  		}
   455  		if e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP {
   456  			framework.Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]))
   457  		}
   458  		framework.Logf("UDP node port: %d", udpNodePort)
   459  
   460  		err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
   461  		framework.ExpectNoError(err)
   462  
   463  		ginkgo.By("hitting the UDP service's LoadBalancer")
   464  		testReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
   465  
   466  		// Change the services' main ports.
   467  
   468  		ginkgo.By("changing the UDP service's port")
   469  		udpService, err = udpJig.UpdateService(ctx, func(s *v1.Service) {
   470  			s.Spec.Ports[0].Port++
   471  		})
   472  		framework.ExpectNoError(err)
   473  		svcPortOld := svcPort
   474  		svcPort = int(udpService.Spec.Ports[0].Port)
   475  		if svcPort == svcPortOld {
   476  			framework.Failf("UDP Spec.Ports[0].Port (%d) did not change", svcPort)
   477  		}
   478  		if int(udpService.Spec.Ports[0].NodePort) != udpNodePort {
   479  			framework.Failf("UDP Spec.Ports[0].NodePort (%d) changed", udpService.Spec.Ports[0].NodePort)
   480  		}
   481  		if e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP {
   482  			framework.Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]))
   483  		}
   484  
   485  		framework.Logf("service port UDP: %d", svcPort)
   486  
   487  		ginkgo.By("hitting the UDP service's NodePort")
   488  		err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
   489  		framework.ExpectNoError(err)
   490  
   491  		ginkgo.By("hitting the UDP service's LoadBalancer")
   492  		testReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
   493  
   494  		ginkgo.By("Scaling the pods to 0")
   495  		err = udpJig.Scale(ctx, 0)
   496  		framework.ExpectNoError(err)
   497  
   498  		ginkgo.By("looking for ICMP REJECT on the UDP service's LoadBalancer")
   499  		testRejectedUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
   500  
   501  		ginkgo.By("Scaling the pods to 1")
   502  		err = udpJig.Scale(ctx, 1)
   503  		framework.ExpectNoError(err)
   504  
   505  		ginkgo.By("hitting the UDP service's NodePort")
   506  		err = udpJig.CheckServiceReachability(ctx, udpService, execPod)
   507  		framework.ExpectNoError(err)
   508  
   509  		ginkgo.By("hitting the UDP service's LoadBalancer")
   510  		testReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
   511  
   512  		// Change the services back to ClusterIP.
   513  
   514  		ginkgo.By("changing UDP service back to type=ClusterIP")
   515  		udpReadback, err := udpJig.UpdateService(ctx, func(s *v1.Service) {
   516  			s.Spec.Type = v1.ServiceTypeClusterIP
   517  		})
   518  		framework.ExpectNoError(err)
   519  		if udpReadback.Spec.Ports[0].NodePort != 0 {
   520  			framework.Fail("UDP Spec.Ports[0].NodePort was not cleared")
   521  		}
   522  		// Wait for the load balancer to be destroyed asynchronously
   523  		_, err = udpJig.WaitForLoadBalancerDestroy(ctx, udpIngressIP, svcPort, loadBalancerCreateTimeout)
   524  		framework.ExpectNoError(err)
   525  
   526  		ginkgo.By("checking the UDP LoadBalancer is closed")
   527  		testNotReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
   528  	})
   529  
   530  	f.It("should only allow access from service loadbalancer source ranges", f.WithSlow(), func(ctx context.Context) {
   531  		// this feature currently supported only on GCE/GKE/AWS/AZURE
   532  		e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws", "azure")
   533  
   534  		loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
   535  
   536  		namespace := f.Namespace.Name
   537  		serviceName := "lb-sourcerange"
   538  		jig := e2eservice.NewTestJig(cs, namespace, serviceName)
   539  
   540  		ginkgo.By("Prepare allow source ips")
   541  		// prepare the exec pods
   542  		// acceptPod are allowed to access the loadbalancer
   543  		acceptPod := e2epod.CreateExecPodOrFail(ctx, cs, namespace, "execpod-accept", nil)
   544  		dropPod := e2epod.CreateExecPodOrFail(ctx, cs, namespace, "execpod-drop", nil)
   545  
   546  		ginkgo.By("creating a pod to be part of the service " + serviceName)
   547  		// This container is an nginx container listening on port 80
   548  		// See kubernetes/contrib/ingress/echoheaders/nginx.conf for content of response
   549  		_, err := jig.Run(ctx, nil)
   550  		framework.ExpectNoError(err)
   551  		// Make sure acceptPod is running. There are certain chances that pod might be terminated due to unexpected reasons.
   552  		acceptPod, err = cs.CoreV1().Pods(namespace).Get(ctx, acceptPod.Name, metav1.GetOptions{})
   553  		framework.ExpectNoError(err, "Unable to get pod %s", acceptPod.Name)
   554  		gomega.Expect(acceptPod.Status.Phase).To(gomega.Equal(v1.PodRunning))
   555  		gomega.Expect(acceptPod.Status.PodIP).ToNot(gomega.BeEmpty())
   556  
   557  		// Create loadbalancer service with source range from node[0] and podAccept
   558  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
   559  			svc.Spec.Type = v1.ServiceTypeLoadBalancer
   560  			svc.Spec.LoadBalancerSourceRanges = []string{acceptPod.Status.PodIP + "/32"}
   561  		})
   562  		framework.ExpectNoError(err)
   563  
   564  		ginkgo.DeferCleanup(func(ctx context.Context) {
   565  			ginkgo.By("Clean up loadbalancer service")
   566  			e2eservice.WaitForServiceDeletedWithFinalizer(ctx, cs, svc.Namespace, svc.Name)
   567  		})
   568  
   569  		svc, err = jig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
   570  		framework.ExpectNoError(err)
   571  
   572  		ginkgo.By("check reachability from different sources")
   573  		svcIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
   574  		// We should wait until service changes are actually propagated in the cloud-provider,
   575  		// as this may take significant amount of time, especially in large clusters.
   576  		// However, the information whether it was already programmed isn't achievable.
   577  		// So we're resolving it by using loadBalancerCreateTimeout that takes cluster size into account.
   578  		checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, acceptPod.Name, svcIP)
   579  		checkReachabilityFromPod(false, loadBalancerCreateTimeout, namespace, dropPod.Name, svcIP)
   580  
   581  		// Make sure dropPod is running. There are certain chances that the pod might be terminated due to unexpected reasons.
   582  		dropPod, err = cs.CoreV1().Pods(namespace).Get(ctx, dropPod.Name, metav1.GetOptions{})
   583  		framework.ExpectNoError(err, "Unable to get pod %s", dropPod.Name)
   584  		gomega.Expect(acceptPod.Status.Phase).To(gomega.Equal(v1.PodRunning))
   585  		gomega.Expect(acceptPod.Status.PodIP).ToNot(gomega.BeEmpty())
   586  
   587  		ginkgo.By("Update service LoadBalancerSourceRange and check reachability")
   588  		_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
   589  			// only allow access from dropPod
   590  			svc.Spec.LoadBalancerSourceRanges = []string{dropPod.Status.PodIP + "/32"}
   591  		})
   592  		framework.ExpectNoError(err)
   593  
   594  		// We should wait until service changes are actually propagates, as this may take
   595  		// significant amount of time, especially in large clusters.
   596  		// However, the information whether it was already programmed isn't achievable.
   597  		// So we're resolving it by using loadBalancerCreateTimeout that takes cluster size into account.
   598  		checkReachabilityFromPod(false, loadBalancerCreateTimeout, namespace, acceptPod.Name, svcIP)
   599  		checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, dropPod.Name, svcIP)
   600  
   601  		ginkgo.By("Delete LoadBalancerSourceRange field and check reachability")
   602  		_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
   603  			svc.Spec.LoadBalancerSourceRanges = nil
   604  		})
   605  		framework.ExpectNoError(err)
   606  		// We should wait until service changes are actually propagates, as this may take
   607  		// significant amount of time, especially in large clusters.
   608  		// However, the information whether it was already programmed isn't achievable.
   609  		// So we're resolving it by using loadBalancerCreateTimeout that takes cluster size into account.
   610  		checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, acceptPod.Name, svcIP)
   611  		checkReachabilityFromPod(true, loadBalancerCreateTimeout, namespace, dropPod.Name, svcIP)
   612  	})
   613  
   614  	f.It("should be able to create an internal type load balancer", f.WithSlow(), func(ctx context.Context) {
   615  		e2eskipper.SkipUnlessProviderIs("gke", "gce")
   616  
   617  		createTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
   618  		pollInterval := framework.Poll * 10
   619  
   620  		namespace := f.Namespace.Name
   621  		serviceName := "lb-internal"
   622  		jig := e2eservice.NewTestJig(cs, namespace, serviceName)
   623  
   624  		ginkgo.By("creating pod to be part of service " + serviceName)
   625  		_, err := jig.Run(ctx, nil)
   626  		framework.ExpectNoError(err)
   627  
   628  		enableILB, disableILB := enableAndDisableInternalLB()
   629  
   630  		isInternalEndpoint := func(lbIngress *v1.LoadBalancerIngress) bool {
   631  			ingressEndpoint := e2eservice.GetIngressPoint(lbIngress)
   632  			ingressIP := netutils.ParseIPSloppy(ingressEndpoint)
   633  			if ingressIP == nil {
   634  				framework.Failf("invalid ingressEndpoint IP address format: %s", ingressEndpoint)
   635  			}
   636  			// Needs update for providers using hostname as endpoint.
   637  			return subnetPrefix.Contains(ingressIP)
   638  		}
   639  
   640  		ginkgo.By("creating a service with type LoadBalancer and cloud specific Internal-LB annotation enabled")
   641  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
   642  			svc.Spec.Type = v1.ServiceTypeLoadBalancer
   643  			enableILB(svc)
   644  		})
   645  		framework.ExpectNoError(err)
   646  
   647  		ginkgo.DeferCleanup(func(ctx context.Context) {
   648  			ginkgo.By("Clean up loadbalancer service")
   649  			e2eservice.WaitForServiceDeletedWithFinalizer(ctx, cs, svc.Namespace, svc.Name)
   650  		})
   651  
   652  		svc, err = jig.WaitForLoadBalancer(ctx, createTimeout)
   653  		framework.ExpectNoError(err)
   654  		lbIngress := &svc.Status.LoadBalancer.Ingress[0]
   655  		svcPort := int(svc.Spec.Ports[0].Port)
   656  		// should have an internal IP.
   657  		if !isInternalEndpoint(lbIngress) {
   658  			framework.Failf("lbIngress %v doesn't have an internal IP", lbIngress)
   659  		}
   660  
   661  		// ILBs are not accessible from the test orchestrator, so it's necessary to use
   662  		//  a pod to test the service.
   663  		ginkgo.By("hitting the internal load balancer from pod")
   664  		framework.Logf("creating pod with host network")
   665  		hostExec := launchHostExecPod(ctx, f.ClientSet, f.Namespace.Name, "ilb-host-exec")
   666  
   667  		framework.Logf("Waiting up to %v for service %q's internal LB to respond to requests", createTimeout, serviceName)
   668  		tcpIngressIP := e2eservice.GetIngressPoint(lbIngress)
   669  		if pollErr := wait.PollImmediate(pollInterval, createTimeout, func() (bool, error) {
   670  			cmd := fmt.Sprintf(`curl -m 5 'http://%v:%v/echo?msg=hello'`, tcpIngressIP, svcPort)
   671  			stdout, err := e2eoutput.RunHostCmd(hostExec.Namespace, hostExec.Name, cmd)
   672  			if err != nil {
   673  				framework.Logf("error curling; stdout: %v. err: %v", stdout, err)
   674  				return false, nil
   675  			}
   676  
   677  			if !strings.Contains(stdout, "hello") {
   678  				framework.Logf("Expected output to contain 'hello', got %q; retrying...", stdout)
   679  				return false, nil
   680  			}
   681  
   682  			framework.Logf("Successful curl; stdout: %v", stdout)
   683  			return true, nil
   684  		}); pollErr != nil {
   685  			framework.Failf("ginkgo.Failed to hit ILB IP, err: %v", pollErr)
   686  		}
   687  
   688  		ginkgo.By("switching to external type LoadBalancer")
   689  		svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
   690  			disableILB(svc)
   691  		})
   692  		framework.ExpectNoError(err)
   693  		framework.Logf("Waiting up to %v for service %q to have an external LoadBalancer", createTimeout, serviceName)
   694  		if pollErr := wait.PollImmediate(pollInterval, createTimeout, func() (bool, error) {
   695  			svc, err := cs.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{})
   696  			if err != nil {
   697  				return false, err
   698  			}
   699  			lbIngress = &svc.Status.LoadBalancer.Ingress[0]
   700  			return !isInternalEndpoint(lbIngress), nil
   701  		}); pollErr != nil {
   702  			framework.Failf("Loadbalancer IP not changed to external.")
   703  		}
   704  		// should have an external IP.
   705  		gomega.Expect(isInternalEndpoint(lbIngress)).To(gomega.BeFalse())
   706  
   707  		ginkgo.By("hitting the external load balancer")
   708  		framework.Logf("Waiting up to %v for service %q's external LB to respond to requests", createTimeout, serviceName)
   709  		tcpIngressIP = e2eservice.GetIngressPoint(lbIngress)
   710  		e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, e2eservice.LoadBalancerLagTimeoutDefault)
   711  
   712  		// GCE cannot test a specific IP because the test may not own it. This cloud specific condition
   713  		// will be removed when GCP supports similar functionality.
   714  		if framework.ProviderIs("azure") {
   715  			ginkgo.By("switching back to interal type LoadBalancer, with static IP specified.")
   716  			// For a cluster created with CAPZ, node-subnet may not be "10.240.0.0/16", e.g. "10.1.0.0/16".
   717  			base := netutils.BigForIP(subnetPrefix.IP)
   718  			offset := big.NewInt(0).SetBytes(netutils.ParseIPSloppy("0.0.11.11").To4()).Int64()
   719  
   720  			internalStaticIP := netutils.AddIPOffset(base, int(offset)).String()
   721  
   722  			svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
   723  				svc.Spec.LoadBalancerIP = internalStaticIP
   724  				enableILB(svc)
   725  			})
   726  			framework.ExpectNoError(err)
   727  			framework.Logf("Waiting up to %v for service %q to have an internal LoadBalancer", createTimeout, serviceName)
   728  			if pollErr := wait.PollImmediate(pollInterval, createTimeout, func() (bool, error) {
   729  				svc, err := cs.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{})
   730  				if err != nil {
   731  					return false, err
   732  				}
   733  				lbIngress = &svc.Status.LoadBalancer.Ingress[0]
   734  				return isInternalEndpoint(lbIngress), nil
   735  			}); pollErr != nil {
   736  				framework.Failf("Loadbalancer IP not changed to internal.")
   737  			}
   738  			// should have the given static internal IP.
   739  			gomega.Expect(e2eservice.GetIngressPoint(lbIngress)).To(gomega.Equal(internalStaticIP))
   740  		}
   741  	})
   742  
   743  	// [LinuxOnly]: Windows does not support session affinity.
   744  	f.It("should have session affinity work for LoadBalancer service with ESIPP on", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) {
   745  		// L4 load balancer affinity `ClientIP` is not supported on AWS ELB.
   746  		e2eskipper.SkipIfProviderIs("aws")
   747  
   748  		svc := getServeHostnameService("affinity-lb-esipp")
   749  		svc.Spec.Type = v1.ServiceTypeLoadBalancer
   750  		svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
   751  		execAffinityTestForLBService(ctx, f, cs, svc)
   752  	})
   753  
   754  	// [LinuxOnly]: Windows does not support session affinity.
   755  	f.It("should be able to switch session affinity for LoadBalancer service with ESIPP on", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) {
   756  		// L4 load balancer affinity `ClientIP` is not supported on AWS ELB.
   757  		e2eskipper.SkipIfProviderIs("aws")
   758  
   759  		svc := getServeHostnameService("affinity-lb-esipp-transition")
   760  		svc.Spec.Type = v1.ServiceTypeLoadBalancer
   761  		svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
   762  		execAffinityTestForLBServiceWithTransition(ctx, f, cs, svc)
   763  	})
   764  
   765  	// [LinuxOnly]: Windows does not support session affinity.
   766  	f.It("should have session affinity work for LoadBalancer service with ESIPP off", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) {
   767  		// L4 load balancer affinity `ClientIP` is not supported on AWS ELB.
   768  		e2eskipper.SkipIfProviderIs("aws")
   769  
   770  		svc := getServeHostnameService("affinity-lb")
   771  		svc.Spec.Type = v1.ServiceTypeLoadBalancer
   772  		svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
   773  		execAffinityTestForLBService(ctx, f, cs, svc)
   774  	})
   775  
   776  	// [LinuxOnly]: Windows does not support session affinity.
   777  	f.It("should be able to switch session affinity for LoadBalancer service with ESIPP off", f.WithSlow(), "[LinuxOnly]", func(ctx context.Context) {
   778  		// L4 load balancer affinity `ClientIP` is not supported on AWS ELB.
   779  		e2eskipper.SkipIfProviderIs("aws")
   780  
   781  		svc := getServeHostnameService("affinity-lb-transition")
   782  		svc.Spec.Type = v1.ServiceTypeLoadBalancer
   783  		svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
   784  		execAffinityTestForLBServiceWithTransition(ctx, f, cs, svc)
   785  	})
   786  
   787  	// This test verifies if service load balancer cleanup finalizer is properly
   788  	// handled during service lifecycle.
   789  	// 1. Create service with type=LoadBalancer. Finalizer should be added.
   790  	// 2. Update service to type=ClusterIP. Finalizer should be removed.
   791  	// 3. Update service to type=LoadBalancer. Finalizer should be added.
   792  	// 4. Delete service with type=LoadBalancer. Finalizer should be removed.
   793  	f.It("should handle load balancer cleanup finalizer for service", f.WithSlow(), func(ctx context.Context) {
   794  		jig := e2eservice.NewTestJig(cs, f.Namespace.Name, "lb-finalizer")
   795  
   796  		ginkgo.By("Create load balancer service")
   797  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
   798  			svc.Spec.Type = v1.ServiceTypeLoadBalancer
   799  		})
   800  		framework.ExpectNoError(err)
   801  
   802  		ginkgo.DeferCleanup(func(ctx context.Context) {
   803  			ginkgo.By("Check that service can be deleted with finalizer")
   804  			e2eservice.WaitForServiceDeletedWithFinalizer(ctx, cs, svc.Namespace, svc.Name)
   805  		})
   806  
   807  		ginkgo.By("Wait for load balancer to serve traffic")
   808  		svc, err = jig.WaitForLoadBalancer(ctx, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs))
   809  		framework.ExpectNoError(err)
   810  
   811  		ginkgo.By("Check if finalizer presents on service with type=LoadBalancer")
   812  		e2eservice.WaitForServiceUpdatedWithFinalizer(ctx, cs, svc.Namespace, svc.Name, true)
   813  
   814  		ginkgo.By("Check if finalizer is removed on service after changed to type=ClusterIP")
   815  		err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs))
   816  		framework.ExpectNoError(err)
   817  		e2eservice.WaitForServiceUpdatedWithFinalizer(ctx, cs, svc.Namespace, svc.Name, false)
   818  
   819  		ginkgo.By("Check if finalizer is added back to service after changed to type=LoadBalancer")
   820  		err = jig.ChangeServiceType(ctx, v1.ServiceTypeLoadBalancer, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs))
   821  		framework.ExpectNoError(err)
   822  		e2eservice.WaitForServiceUpdatedWithFinalizer(ctx, cs, svc.Namespace, svc.Name, true)
   823  	})
   824  
   825  	f.It("should be able to create LoadBalancer Service without NodePort and change it", f.WithSlow(), func(ctx context.Context) {
   826  		// requires cloud load-balancer support
   827  		e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws")
   828  
   829  		loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault
   830  		if framework.ProviderIs("aws") {
   831  			loadBalancerLagTimeout = e2eservice.LoadBalancerLagTimeoutAWS
   832  		}
   833  		loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
   834  
   835  		// This test is more monolithic than we'd like because LB turnup can be
   836  		// very slow, so we lumped all the tests into one LB lifecycle.
   837  
   838  		serviceName := "reallocate-nodeport-test"
   839  		ns1 := f.Namespace.Name // LB1 in ns1 on TCP
   840  		framework.Logf("namespace for TCP test: %s", ns1)
   841  
   842  		ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns1)
   843  		tcpJig := e2eservice.NewTestJig(cs, ns1, serviceName)
   844  		tcpService, err := tcpJig.CreateTCPService(ctx, nil)
   845  		framework.ExpectNoError(err)
   846  
   847  		svcPort := int(tcpService.Spec.Ports[0].Port)
   848  		framework.Logf("service port TCP: %d", svcPort)
   849  
   850  		ginkgo.By("creating a pod to be part of the TCP service " + serviceName)
   851  		_, err = tcpJig.Run(ctx, nil)
   852  		framework.ExpectNoError(err)
   853  
   854  		// Change the services to LoadBalancer.
   855  
   856  		// Here we test that LoadBalancers can receive static IP addresses.  This isn't
   857  		// necessary, but is an additional feature this monolithic test checks.
   858  		requestedIP := ""
   859  		staticIPName := ""
   860  		if framework.ProviderIs("gce", "gke") {
   861  			ginkgo.By("creating a static load balancer IP")
   862  			staticIPName = fmt.Sprintf("e2e-external-lb-test-%s", framework.RunID)
   863  			gceCloud, err := gce.GetGCECloud()
   864  			framework.ExpectNoError(err, "failed to get GCE cloud provider")
   865  
   866  			err = gceCloud.ReserveRegionAddress(&compute.Address{Name: staticIPName}, gceCloud.Region())
   867  			ginkgo.DeferCleanup(func(ctx context.Context) {
   868  				if staticIPName != "" {
   869  					// Release GCE static IP - this is not kube-managed and will not be automatically released.
   870  					if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
   871  						framework.Logf("failed to release static IP %s: %v", staticIPName, err)
   872  					}
   873  				}
   874  			})
   875  			framework.ExpectNoError(err, "failed to create region address: %s", staticIPName)
   876  			reservedAddr, err := gceCloud.GetRegionAddress(staticIPName, gceCloud.Region())
   877  			framework.ExpectNoError(err, "failed to get region address: %s", staticIPName)
   878  
   879  			requestedIP = reservedAddr.Address
   880  			framework.Logf("Allocated static load balancer IP: %s", requestedIP)
   881  		}
   882  
   883  		ginkgo.By("changing the TCP service to type=LoadBalancer")
   884  		_, err = tcpJig.UpdateService(ctx, func(s *v1.Service) {
   885  			s.Spec.LoadBalancerIP = requestedIP // will be "" if not applicable
   886  			s.Spec.Type = v1.ServiceTypeLoadBalancer
   887  			s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(false)
   888  		})
   889  		framework.ExpectNoError(err)
   890  
   891  		ginkgo.By("waiting for the TCP service to have a load balancer")
   892  		// Wait for the load balancer to be created asynchronously
   893  		tcpService, err = tcpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
   894  		framework.ExpectNoError(err)
   895  		if int(tcpService.Spec.Ports[0].NodePort) != 0 {
   896  			framework.Failf("TCP Spec.Ports[0].NodePort allocated %d when not expected", tcpService.Spec.Ports[0].NodePort)
   897  		}
   898  		if requestedIP != "" && e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != requestedIP {
   899  			framework.Failf("unexpected TCP Status.LoadBalancer.Ingress (expected %s, got %s)", requestedIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
   900  		}
   901  		tcpIngressIP := e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])
   902  		framework.Logf("TCP load balancer: %s", tcpIngressIP)
   903  
   904  		if framework.ProviderIs("gce", "gke") {
   905  			// Do this as early as possible, which overrides the `defer` above.
   906  			// This is mostly out of fear of leaking the IP in a timeout case
   907  			// (as of this writing we're not 100% sure where the leaks are
   908  			// coming from, so this is first-aid rather than surgery).
   909  			ginkgo.By("demoting the static IP to ephemeral")
   910  			if staticIPName != "" {
   911  				gceCloud, err := gce.GetGCECloud()
   912  				framework.ExpectNoError(err, "failed to get GCE cloud provider")
   913  				// Deleting it after it is attached "demotes" it to an
   914  				// ephemeral IP, which can be auto-released.
   915  				if err := gceCloud.DeleteRegionAddress(staticIPName, gceCloud.Region()); err != nil {
   916  					framework.Failf("failed to release static IP %s: %v", staticIPName, err)
   917  				}
   918  				staticIPName = ""
   919  			}
   920  		}
   921  
   922  		ginkgo.By("hitting the TCP service's LoadBalancer")
   923  		e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout)
   924  
   925  		// Change the services' node ports.
   926  
   927  		ginkgo.By("adding a TCP service's NodePort")
   928  		tcpService, err = tcpJig.UpdateService(ctx, func(s *v1.Service) {
   929  			s.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
   930  		})
   931  		framework.ExpectNoError(err)
   932  		tcpNodePort := int(tcpService.Spec.Ports[0].NodePort)
   933  		if tcpNodePort == 0 {
   934  			framework.Failf("TCP Spec.Ports[0].NodePort (%d) not allocated", tcpNodePort)
   935  		}
   936  		if e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
   937  			framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
   938  		}
   939  		framework.Logf("TCP node port: %d", tcpNodePort)
   940  
   941  		ginkgo.By("hitting the TCP service's LoadBalancer")
   942  		e2eservice.TestReachableHTTP(ctx, tcpIngressIP, svcPort, loadBalancerLagTimeout)
   943  	})
   944  
   945  	ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a LoadBalancer service on different nodes", func(ctx context.Context) {
   946  		// requires cloud load-balancer support
   947  		e2eskipper.SkipUnlessProviderIs("gce", "gke", "azure")
   948  		ns := f.Namespace.Name
   949  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
   950  		framework.ExpectNoError(err)
   951  		if len(nodes.Items) < 2 {
   952  			e2eskipper.Skipf(
   953  				"Test requires >= 2 Ready nodes, but there are only %v nodes",
   954  				len(nodes.Items))
   955  		}
   956  
   957  		loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault
   958  		loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
   959  
   960  		// Create a LoadBalancer service
   961  		udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
   962  		ginkgo.By("creating a UDP service " + serviceName + " with type=LoadBalancer in " + ns)
   963  		_, err = udpJig.CreateUDPService(ctx, func(svc *v1.Service) {
   964  			svc.Spec.Type = v1.ServiceTypeLoadBalancer
   965  			svc.Spec.Ports = []v1.ServicePort{
   966  				{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(80)},
   967  			}
   968  		})
   969  		framework.ExpectNoError(err)
   970  
   971  		var udpIngressIP string
   972  		ginkgo.By("waiting for the UDP service to have a load balancer")
   973  		udpService, err := udpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
   974  		framework.ExpectNoError(err)
   975  
   976  		udpIngressIP = e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])
   977  		framework.Logf("UDP load balancer: %s", udpIngressIP)
   978  
   979  		// keep hitting the loadbalancer to check it fails over to the second pod
   980  		ginkgo.By("hitting the UDP service's LoadBalancer with same source port")
   981  		stopCh := make(chan struct{})
   982  		defer close(stopCh)
   983  		var mu sync.Mutex
   984  		hostnames := sets.NewString()
   985  		go func() {
   986  			defer ginkgo.GinkgoRecover()
   987  			port := int(udpService.Spec.Ports[0].Port)
   988  			laddr, err := net.ResolveUDPAddr("udp", ":54321")
   989  			if err != nil {
   990  				framework.Failf("Failed to resolve local address: %v", err)
   991  			}
   992  			raddr := net.UDPAddr{IP: netutils.ParseIPSloppy(udpIngressIP), Port: port}
   993  
   994  			for {
   995  				select {
   996  				case <-stopCh:
   997  					if len(hostnames) != 2 {
   998  						framework.Failf("Failed to hit the 2 UDP LoadBalancer backends successfully, got %v", hostnames.List())
   999  					}
  1000  					return
  1001  				default:
  1002  					time.Sleep(1 * time.Second)
  1003  				}
  1004  
  1005  				conn, err := net.DialUDP("udp", laddr, &raddr)
  1006  				if err != nil {
  1007  					framework.Logf("Failed to connect to: %s %d", udpIngressIP, port)
  1008  					continue
  1009  				}
  1010  				conn.SetDeadline(time.Now().Add(3 * time.Second))
  1011  				framework.Logf("Connected successfully to: %s", raddr.String())
  1012  				conn.Write([]byte("hostname\n"))
  1013  				buff := make([]byte, 1024)
  1014  				n, _, err := conn.ReadFrom(buff)
  1015  				if err == nil {
  1016  					mu.Lock()
  1017  					hostnames.Insert(string(buff[:n]))
  1018  					mu.Unlock()
  1019  					framework.Logf("Connected successfully to hostname: %s", string(buff[:n]))
  1020  				}
  1021  				conn.Close()
  1022  			}
  1023  		}()
  1024  
  1025  		// Add a backend pod to the service in one node
  1026  		ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
  1027  		serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
  1028  		serverPod1.Labels = udpJig.Labels
  1029  		serverPod1.Spec.Hostname = "hostname1"
  1030  		nodeSelection := e2epod.NodeSelection{Name: nodes.Items[0].Name}
  1031  		e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection)
  1032  		e2epod.NewPodClient(f).CreateSync(ctx, serverPod1)
  1033  
  1034  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend1: {80}})
  1035  
  1036  		// Note that the fact that Endpoints object already exists, does NOT mean
  1037  		// that iptables (or whatever else is used) was already programmed.
  1038  		// Additionally take into account that UDP conntract entries timeout is
  1039  		// 30 seconds by default.
  1040  		// Based on the above check if the pod receives the traffic.
  1041  		ginkgo.By("checking client pod connected to the backend 1 on Node " + nodes.Items[0].Name)
  1042  		if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) {
  1043  			mu.Lock()
  1044  			defer mu.Unlock()
  1045  			return hostnames.Has(serverPod1.Spec.Hostname), nil
  1046  		}); err != nil {
  1047  			framework.Failf("Failed to connect to backend 1")
  1048  		}
  1049  
  1050  		// Create a second pod
  1051  		ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName)
  1052  		serverPod2 := e2epod.NewAgnhostPod(ns, podBackend2, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
  1053  		serverPod2.Labels = udpJig.Labels
  1054  		serverPod2.Spec.Hostname = "hostname2"
  1055  		nodeSelection = e2epod.NodeSelection{Name: nodes.Items[1].Name}
  1056  		e2epod.SetNodeSelection(&serverPod2.Spec, nodeSelection)
  1057  		e2epod.NewPodClient(f).CreateSync(ctx, serverPod2)
  1058  
  1059  		// and delete the first pod
  1060  		framework.Logf("Cleaning up %s pod", podBackend1)
  1061  		e2epod.NewPodClient(f).DeleteSync(ctx, podBackend1, metav1.DeleteOptions{}, e2epod.DefaultPodDeletionTimeout)
  1062  
  1063  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend2: {80}})
  1064  
  1065  		// Check that the second pod keeps receiving traffic
  1066  		// UDP conntrack entries timeout is 30 sec by default
  1067  		ginkgo.By("checking client pod connected to the backend 2 on Node " + nodes.Items[1].Name)
  1068  		if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) {
  1069  			mu.Lock()
  1070  			defer mu.Unlock()
  1071  			return hostnames.Has(serverPod2.Spec.Hostname), nil
  1072  		}); err != nil {
  1073  			framework.Failf("Failed to connect to backend 2")
  1074  		}
  1075  	})
  1076  
  1077  	ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a LoadBalancer service on the same nodes", func(ctx context.Context) {
  1078  		// requires cloud load-balancer support
  1079  		e2eskipper.SkipUnlessProviderIs("gce", "gke", "azure")
  1080  		ns := f.Namespace.Name
  1081  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 1)
  1082  		framework.ExpectNoError(err)
  1083  		if len(nodes.Items) < 1 {
  1084  			e2eskipper.Skipf(
  1085  				"Test requires >= 1 Ready nodes, but there are only %d nodes",
  1086  				len(nodes.Items))
  1087  		}
  1088  
  1089  		loadBalancerLagTimeout := e2eservice.LoadBalancerLagTimeoutDefault
  1090  		loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
  1091  
  1092  		// Create a LoadBalancer service
  1093  		udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
  1094  		ginkgo.By("creating a UDP service " + serviceName + " with type=LoadBalancer in " + ns)
  1095  		_, err = udpJig.CreateUDPService(ctx, func(svc *v1.Service) {
  1096  			svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1097  			svc.Spec.Ports = []v1.ServicePort{
  1098  				{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(80)},
  1099  			}
  1100  		})
  1101  		framework.ExpectNoError(err)
  1102  
  1103  		var udpIngressIP string
  1104  		ginkgo.By("waiting for the UDP service to have a load balancer")
  1105  		udpService, err := udpJig.WaitForLoadBalancer(ctx, loadBalancerCreateTimeout)
  1106  		framework.ExpectNoError(err)
  1107  
  1108  		udpIngressIP = e2eservice.GetIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])
  1109  		framework.Logf("UDP load balancer: %s", udpIngressIP)
  1110  
  1111  		// keep hitting the loadbalancer to check it fails over to the second pod
  1112  		ginkgo.By("hitting the UDP service's LoadBalancer with same source port")
  1113  		stopCh := make(chan struct{})
  1114  		defer close(stopCh)
  1115  		var mu sync.Mutex
  1116  		hostnames := sets.NewString()
  1117  		go func() {
  1118  			defer ginkgo.GinkgoRecover()
  1119  			port := int(udpService.Spec.Ports[0].Port)
  1120  			laddr, err := net.ResolveUDPAddr("udp", ":54322")
  1121  			if err != nil {
  1122  				framework.Failf("Failed to resolve local address: %v", err)
  1123  			}
  1124  			raddr := net.UDPAddr{IP: netutils.ParseIPSloppy(udpIngressIP), Port: port}
  1125  
  1126  			for {
  1127  				select {
  1128  				case <-stopCh:
  1129  					if len(hostnames) != 2 {
  1130  						framework.Failf("Failed to hit the 2 UDP LoadBalancer backends successfully, got %v", hostnames.List())
  1131  					}
  1132  					return
  1133  				default:
  1134  					time.Sleep(1 * time.Second)
  1135  				}
  1136  
  1137  				conn, err := net.DialUDP("udp", laddr, &raddr)
  1138  				if err != nil {
  1139  					framework.Logf("Failed to connect to: %s %d", udpIngressIP, port)
  1140  					continue
  1141  				}
  1142  				conn.SetDeadline(time.Now().Add(3 * time.Second))
  1143  				framework.Logf("Connected successfully to: %s", raddr.String())
  1144  				conn.Write([]byte("hostname\n"))
  1145  				buff := make([]byte, 1024)
  1146  				n, _, err := conn.ReadFrom(buff)
  1147  				if err == nil {
  1148  					mu.Lock()
  1149  					hostnames.Insert(string(buff[:n]))
  1150  					mu.Unlock()
  1151  					framework.Logf("Connected successfully to hostname: %s", string(buff[:n]))
  1152  				}
  1153  				conn.Close()
  1154  			}
  1155  		}()
  1156  
  1157  		// Add a backend pod to the service in one node
  1158  		ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
  1159  		serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
  1160  		serverPod1.Labels = udpJig.Labels
  1161  		serverPod1.Spec.Hostname = "hostname1"
  1162  		nodeSelection := e2epod.NodeSelection{Name: nodes.Items[0].Name}
  1163  		e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection)
  1164  		e2epod.NewPodClient(f).CreateSync(ctx, serverPod1)
  1165  
  1166  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend1: {80}})
  1167  
  1168  		// Note that the fact that Endpoints object already exists, does NOT mean
  1169  		// that iptables (or whatever else is used) was already programmed.
  1170  		// Additionally take into account that UDP conntract entries timeout is
  1171  		// 30 seconds by default.
  1172  		// Based on the above check if the pod receives the traffic.
  1173  		ginkgo.By("checking client pod connected to the backend 1 on Node " + nodes.Items[0].Name)
  1174  		if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) {
  1175  			mu.Lock()
  1176  			defer mu.Unlock()
  1177  			return hostnames.Has(serverPod1.Spec.Hostname), nil
  1178  		}); err != nil {
  1179  			framework.Failf("Failed to connect to backend 1")
  1180  		}
  1181  
  1182  		// Create a second pod on the same node
  1183  		ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName)
  1184  		serverPod2 := e2epod.NewAgnhostPod(ns, podBackend2, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
  1185  		serverPod2.Labels = udpJig.Labels
  1186  		serverPod2.Spec.Hostname = "hostname2"
  1187  		// use the same node as previous pod
  1188  		e2epod.SetNodeSelection(&serverPod2.Spec, nodeSelection)
  1189  		e2epod.NewPodClient(f).CreateSync(ctx, serverPod2)
  1190  
  1191  		// and delete the first pod
  1192  		framework.Logf("Cleaning up %s pod", podBackend1)
  1193  		e2epod.NewPodClient(f).DeleteSync(ctx, podBackend1, metav1.DeleteOptions{}, e2epod.DefaultPodDeletionTimeout)
  1194  
  1195  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podBackend2: {80}})
  1196  
  1197  		// Check that the second pod keeps receiving traffic
  1198  		// UDP conntrack entries timeout is 30 sec by default
  1199  		ginkgo.By("checking client pod connected to the backend 2 on Node " + nodes.Items[0].Name)
  1200  		if err := wait.PollImmediate(1*time.Second, loadBalancerLagTimeout, func() (bool, error) {
  1201  			mu.Lock()
  1202  			defer mu.Unlock()
  1203  			return hostnames.Has(serverPod2.Spec.Hostname), nil
  1204  		}); err != nil {
  1205  			framework.Failf("Failed to connect to backend 2")
  1206  		}
  1207  	})
  1208  
  1209  	f.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Cluster", f.WithSlow(), func(ctx context.Context) {
  1210  		// We start with a low but reasonable threshold to analyze the results.
  1211  		// The goal is to achieve 99% minimum success rate.
  1212  		// TODO: We should do incremental steps toward the goal.
  1213  		minSuccessRate := 0.95
  1214  
  1215  		testRollingUpdateLBConnectivityDisruption(ctx, f, v1.ServiceExternalTrafficPolicyTypeCluster, minSuccessRate)
  1216  	})
  1217  
  1218  	f.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Local", f.WithSlow(), func(ctx context.Context) {
  1219  		// We start with a low but reasonable threshold to analyze the results.
  1220  		// The goal is to achieve 99% minimum success rate.
  1221  		// TODO: We should do incremental steps toward the goal.
  1222  		minSuccessRate := 0.95
  1223  
  1224  		testRollingUpdateLBConnectivityDisruption(ctx, f, v1.ServiceExternalTrafficPolicyTypeLocal, minSuccessRate)
  1225  	})
  1226  })
  1227  
  1228  var _ = common.SIGDescribe("LoadBalancers ESIPP", framework.WithSlow(), func() {
  1229  	f := framework.NewDefaultFramework("esipp")
  1230  	f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
  1231  	var loadBalancerCreateTimeout time.Duration
  1232  
  1233  	var cs clientset.Interface
  1234  	var subnetPrefix *net.IPNet
  1235  	var err error
  1236  
  1237  	ginkgo.BeforeEach(func(ctx context.Context) {
  1238  		// requires cloud load-balancer support - this feature currently supported only on GCE/GKE
  1239  		e2eskipper.SkipUnlessProviderIs("gce", "gke")
  1240  
  1241  		cs = f.ClientSet
  1242  		loadBalancerCreateTimeout = e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
  1243  		subnetPrefix, err = getSubnetPrefix(ctx, cs)
  1244  		framework.ExpectNoError(err)
  1245  	})
  1246  
  1247  	ginkgo.AfterEach(func(ctx context.Context) {
  1248  		if ginkgo.CurrentSpecReport().Failed() {
  1249  			DescribeSvc(f.Namespace.Name)
  1250  		}
  1251  	})
  1252  
  1253  	ginkgo.It("should work for type=LoadBalancer", func(ctx context.Context) {
  1254  		namespace := f.Namespace.Name
  1255  		serviceName := "external-local-lb"
  1256  		jig := e2eservice.NewTestJig(cs, namespace, serviceName)
  1257  
  1258  		svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil)
  1259  		framework.ExpectNoError(err)
  1260  		healthCheckNodePort := int(svc.Spec.HealthCheckNodePort)
  1261  		if healthCheckNodePort == 0 {
  1262  			framework.Failf("Service HealthCheck NodePort was not allocated")
  1263  		}
  1264  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1265  			err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
  1266  			framework.ExpectNoError(err)
  1267  
  1268  			// Make sure we didn't leak the health check node port.
  1269  			const threshold = 2
  1270  			nodes, err := getEndpointNodesWithInternalIP(ctx, jig)
  1271  			framework.ExpectNoError(err)
  1272  			config := e2enetwork.NewNetworkingTestConfig(ctx, f)
  1273  			for _, internalIP := range nodes {
  1274  				err := testHTTPHealthCheckNodePortFromTestContainer(ctx,
  1275  					config,
  1276  					internalIP,
  1277  					healthCheckNodePort,
  1278  					e2eservice.KubeProxyLagTimeout,
  1279  					false,
  1280  					threshold)
  1281  				framework.ExpectNoError(err)
  1282  			}
  1283  			err = cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
  1284  			framework.ExpectNoError(err)
  1285  		})
  1286  
  1287  		svcTCPPort := int(svc.Spec.Ports[0].Port)
  1288  		ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
  1289  
  1290  		ginkgo.By("reading clientIP using the TCP service's service port via its external VIP")
  1291  		clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, "/clientip")
  1292  		framework.ExpectNoError(err)
  1293  		framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIPPort)
  1294  
  1295  		ginkgo.By("checking if Source IP is preserved")
  1296  		// The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
  1297  		host, _, err := net.SplitHostPort(clientIPPort)
  1298  		if err != nil {
  1299  			framework.Failf("SplitHostPort returned unexpected error: %q", clientIPPort)
  1300  		}
  1301  		ip := netutils.ParseIPSloppy(host)
  1302  		if ip == nil {
  1303  			framework.Failf("Invalid client IP address format: %q", host)
  1304  		}
  1305  		if subnetPrefix.Contains(ip) {
  1306  			framework.Failf("Source IP was NOT preserved")
  1307  		}
  1308  	})
  1309  
  1310  	ginkgo.It("should work for type=NodePort", func(ctx context.Context) {
  1311  		namespace := f.Namespace.Name
  1312  		serviceName := "external-local-nodeport"
  1313  		jig := e2eservice.NewTestJig(cs, namespace, serviceName)
  1314  
  1315  		svc, err := jig.CreateOnlyLocalNodePortService(ctx, true)
  1316  		framework.ExpectNoError(err)
  1317  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1318  			err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
  1319  			framework.ExpectNoError(err)
  1320  		})
  1321  
  1322  		tcpNodePort := int(svc.Spec.Ports[0].NodePort)
  1323  
  1324  		endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig)
  1325  		framework.ExpectNoError(err)
  1326  
  1327  		dialCmd := "clientip"
  1328  		config := e2enetwork.NewNetworkingTestConfig(ctx, f)
  1329  
  1330  		for nodeName, nodeIP := range endpointsNodeMap {
  1331  			ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v:%v/%v", nodeName, nodeIP, tcpNodePort, dialCmd))
  1332  			clientIP, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, dialCmd)
  1333  			framework.ExpectNoError(err)
  1334  			framework.Logf("ClientIP detected by target pod using NodePort is %s, the ip of test container is %s", clientIP, config.TestContainerPod.Status.PodIP)
  1335  			// the clientIP returned by agnhost contains port
  1336  			if !strings.HasPrefix(clientIP, config.TestContainerPod.Status.PodIP) {
  1337  				framework.Failf("Source IP was NOT preserved")
  1338  			}
  1339  		}
  1340  	})
  1341  
  1342  	ginkgo.It("should only target nodes with endpoints", func(ctx context.Context) {
  1343  		namespace := f.Namespace.Name
  1344  		serviceName := "external-local-nodes"
  1345  		jig := e2eservice.NewTestJig(cs, namespace, serviceName)
  1346  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests)
  1347  		framework.ExpectNoError(err)
  1348  
  1349  		svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, false,
  1350  			func(svc *v1.Service) {
  1351  				// Change service port to avoid collision with opened hostPorts
  1352  				// in other tests that run in parallel.
  1353  				if len(svc.Spec.Ports) != 0 {
  1354  					svc.Spec.Ports[0].TargetPort = intstr.FromInt32(svc.Spec.Ports[0].Port)
  1355  					svc.Spec.Ports[0].Port = 8081
  1356  				}
  1357  
  1358  			})
  1359  		framework.ExpectNoError(err)
  1360  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1361  			err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
  1362  			framework.ExpectNoError(err)
  1363  			err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
  1364  			framework.ExpectNoError(err)
  1365  		})
  1366  
  1367  		healthCheckNodePort := int(svc.Spec.HealthCheckNodePort)
  1368  		if healthCheckNodePort == 0 {
  1369  			framework.Failf("Service HealthCheck NodePort was not allocated")
  1370  		}
  1371  
  1372  		ips := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
  1373  
  1374  		ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
  1375  		svcTCPPort := int(svc.Spec.Ports[0].Port)
  1376  
  1377  		const threshold = 2
  1378  		config := e2enetwork.NewNetworkingTestConfig(ctx, f)
  1379  		for i := 0; i < len(nodes.Items); i++ {
  1380  			endpointNodeName := nodes.Items[i].Name
  1381  
  1382  			ginkgo.By("creating a pod to be part of the service " + serviceName + " on node " + endpointNodeName)
  1383  			_, err = jig.Run(ctx, func(rc *v1.ReplicationController) {
  1384  				rc.Name = serviceName
  1385  				if endpointNodeName != "" {
  1386  					rc.Spec.Template.Spec.NodeName = endpointNodeName
  1387  				}
  1388  			})
  1389  			framework.ExpectNoError(err)
  1390  
  1391  			ginkgo.By(fmt.Sprintf("waiting for service endpoint on node %v", endpointNodeName))
  1392  			err = jig.WaitForEndpointOnNode(ctx, endpointNodeName)
  1393  			framework.ExpectNoError(err)
  1394  
  1395  			// HealthCheck should pass only on the node where num(endpoints) > 0
  1396  			// All other nodes should fail the healthcheck on the service healthCheckNodePort
  1397  			for n, internalIP := range ips {
  1398  				// Make sure the loadbalancer picked up the health check change.
  1399  				// Confirm traffic can reach backend through LB before checking healthcheck nodeport.
  1400  				e2eservice.TestReachableHTTP(ctx, ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout)
  1401  				expectedSuccess := nodes.Items[n].Name == endpointNodeName
  1402  				port := strconv.Itoa(healthCheckNodePort)
  1403  				ipPort := net.JoinHostPort(internalIP, port)
  1404  				framework.Logf("Health checking %s, http://%s/healthz, expectedSuccess %v", nodes.Items[n].Name, ipPort, expectedSuccess)
  1405  				err := testHTTPHealthCheckNodePortFromTestContainer(ctx,
  1406  					config,
  1407  					internalIP,
  1408  					healthCheckNodePort,
  1409  					e2eservice.KubeProxyEndpointLagTimeout,
  1410  					expectedSuccess,
  1411  					threshold)
  1412  				framework.ExpectNoError(err)
  1413  			}
  1414  			framework.ExpectNoError(e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, namespace, serviceName))
  1415  		}
  1416  	})
  1417  
  1418  	ginkgo.It("should work from pods", func(ctx context.Context) {
  1419  		var err error
  1420  		namespace := f.Namespace.Name
  1421  		serviceName := "external-local-pods"
  1422  		jig := e2eservice.NewTestJig(cs, namespace, serviceName)
  1423  
  1424  		svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil)
  1425  		framework.ExpectNoError(err)
  1426  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1427  			err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
  1428  			framework.ExpectNoError(err)
  1429  			err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
  1430  			framework.ExpectNoError(err)
  1431  		})
  1432  
  1433  		ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
  1434  		port := strconv.Itoa(int(svc.Spec.Ports[0].Port))
  1435  		ipPort := net.JoinHostPort(ingressIP, port)
  1436  		path := fmt.Sprintf("%s/clientip", ipPort)
  1437  
  1438  		ginkgo.By("Creating pause pod deployment to make sure, pausePods are in desired state")
  1439  		deployment := createPausePodDeployment(ctx, cs, "pause-pod-deployment", namespace, 1)
  1440  		framework.ExpectNoError(e2edeployment.WaitForDeploymentComplete(cs, deployment), "Failed to complete pause pod deployment")
  1441  
  1442  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1443  			framework.Logf("Deleting deployment")
  1444  			err = cs.AppsV1().Deployments(namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{})
  1445  			framework.ExpectNoError(err, "Failed to delete deployment %s", deployment.Name)
  1446  		})
  1447  
  1448  		deployment, err = cs.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{})
  1449  		framework.ExpectNoError(err, "Error in retrieving pause pod deployment")
  1450  		labelSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
  1451  		framework.ExpectNoError(err, "Error in setting LabelSelector as selector from deployment")
  1452  
  1453  		pausePods, err := cs.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector.String()})
  1454  		framework.ExpectNoError(err, "Error in listing pods associated with pause pod deployments")
  1455  
  1456  		pausePod := pausePods.Items[0]
  1457  		framework.Logf("Waiting up to %v curl %v", e2eservice.KubeProxyLagTimeout, path)
  1458  		cmd := fmt.Sprintf(`curl -q -s --connect-timeout 30 %v`, path)
  1459  
  1460  		var srcIP string
  1461  		loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs)
  1462  		ginkgo.By(fmt.Sprintf("Hitting external lb %v from pod %v on node %v", ingressIP, pausePod.Name, pausePod.Spec.NodeName))
  1463  		if pollErr := wait.PollImmediate(framework.Poll, loadBalancerPropagationTimeout, func() (bool, error) {
  1464  			stdout, err := e2eoutput.RunHostCmd(pausePod.Namespace, pausePod.Name, cmd)
  1465  			if err != nil {
  1466  				framework.Logf("got err: %v, retry until timeout", err)
  1467  				return false, nil
  1468  			}
  1469  			srcIP = strings.TrimSpace(strings.Split(stdout, ":")[0])
  1470  			return srcIP == pausePod.Status.PodIP, nil
  1471  		}); pollErr != nil {
  1472  			framework.Failf("Source IP not preserved from %v, expected '%v' got '%v'", pausePod.Name, pausePod.Status.PodIP, srcIP)
  1473  		}
  1474  	})
  1475  
  1476  	ginkgo.It("should handle updates to ExternalTrafficPolicy field", func(ctx context.Context) {
  1477  		namespace := f.Namespace.Name
  1478  		serviceName := "external-local-update"
  1479  		jig := e2eservice.NewTestJig(cs, namespace, serviceName)
  1480  
  1481  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests)
  1482  		framework.ExpectNoError(err)
  1483  		if len(nodes.Items) < 2 {
  1484  			framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint")
  1485  		}
  1486  
  1487  		svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil)
  1488  		framework.ExpectNoError(err)
  1489  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1490  			err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
  1491  			framework.ExpectNoError(err)
  1492  			err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
  1493  			framework.ExpectNoError(err)
  1494  		})
  1495  
  1496  		// save the health check node port because it disappears when ESIPP is turned off.
  1497  		healthCheckNodePort := int(svc.Spec.HealthCheckNodePort)
  1498  
  1499  		ginkgo.By("turning ESIPP off")
  1500  		svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
  1501  			svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
  1502  		})
  1503  		framework.ExpectNoError(err)
  1504  		if svc.Spec.HealthCheckNodePort > 0 {
  1505  			framework.Failf("Service HealthCheck NodePort still present")
  1506  		}
  1507  
  1508  		epNodes, err := jig.ListNodesWithEndpoint(ctx)
  1509  		framework.ExpectNoError(err)
  1510  		// map from name of nodes with endpoint to internal ip
  1511  		// it is assumed that there is only a single node with the endpoint
  1512  		endpointNodeMap := make(map[string]string)
  1513  		// map from name of nodes without endpoint to internal ip
  1514  		noEndpointNodeMap := make(map[string]string)
  1515  		for _, node := range epNodes {
  1516  			ips := e2enode.GetAddresses(&node, v1.NodeInternalIP)
  1517  			if len(ips) < 1 {
  1518  				framework.Failf("No internal ip found for node %s", node.Name)
  1519  			}
  1520  			endpointNodeMap[node.Name] = ips[0]
  1521  		}
  1522  		for _, n := range nodes.Items {
  1523  			ips := e2enode.GetAddresses(&n, v1.NodeInternalIP)
  1524  			if len(ips) < 1 {
  1525  				framework.Failf("No internal ip found for node %s", n.Name)
  1526  			}
  1527  			if _, ok := endpointNodeMap[n.Name]; !ok {
  1528  				noEndpointNodeMap[n.Name] = ips[0]
  1529  			}
  1530  		}
  1531  		gomega.Expect(endpointNodeMap).ToNot(gomega.BeEmpty())
  1532  		gomega.Expect(noEndpointNodeMap).ToNot(gomega.BeEmpty())
  1533  
  1534  		svcTCPPort := int(svc.Spec.Ports[0].Port)
  1535  		svcNodePort := int(svc.Spec.Ports[0].NodePort)
  1536  		ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
  1537  		path := "/clientip"
  1538  		dialCmd := "clientip"
  1539  
  1540  		config := e2enetwork.NewNetworkingTestConfig(ctx, f)
  1541  
  1542  		ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap))
  1543  		for nodeName, nodeIP := range noEndpointNodeMap {
  1544  			ginkgo.By(fmt.Sprintf("Checking %v (%v:%v/%v) proxies to endpoints on another node", nodeName, nodeIP[0], svcNodePort, dialCmd))
  1545  			_, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, svcNodePort, e2eservice.KubeProxyLagTimeout, dialCmd)
  1546  			framework.ExpectNoError(err, "Could not reach HTTP service through %v:%v/%v after %v", nodeIP, svcNodePort, dialCmd, e2eservice.KubeProxyLagTimeout)
  1547  		}
  1548  
  1549  		for nodeName, nodeIP := range endpointNodeMap {
  1550  			ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIP))
  1551  			var body string
  1552  			pollFn := func() (bool, error) {
  1553  				// we expect connection failure here, but not other errors
  1554  				resp, err := config.GetResponseFromTestContainer(ctx,
  1555  					"http",
  1556  					"healthz",
  1557  					nodeIP,
  1558  					healthCheckNodePort)
  1559  				if err != nil {
  1560  					return false, nil
  1561  				}
  1562  				if len(resp.Errors) > 0 {
  1563  					return true, nil
  1564  				}
  1565  				if len(resp.Responses) > 0 {
  1566  					body = resp.Responses[0]
  1567  				}
  1568  				return false, nil
  1569  			}
  1570  			if pollErr := wait.PollImmediate(framework.Poll, e2eservice.TestTimeout, pollFn); pollErr != nil {
  1571  				framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. body %s",
  1572  					nodeName, healthCheckNodePort, body)
  1573  			}
  1574  		}
  1575  
  1576  		// Poll till kube-proxy re-adds the MASQUERADE rule on the node.
  1577  		ginkgo.By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP))
  1578  		var clientIP string
  1579  		pollErr := wait.PollImmediate(framework.Poll, 3*e2eservice.KubeProxyLagTimeout, func() (bool, error) {
  1580  			clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
  1581  			if err != nil {
  1582  				return false, nil
  1583  			}
  1584  			// The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
  1585  			host, _, err := net.SplitHostPort(clientIPPort)
  1586  			if err != nil {
  1587  				framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
  1588  				return false, nil
  1589  			}
  1590  			ip := netutils.ParseIPSloppy(host)
  1591  			if ip == nil {
  1592  				framework.Logf("Invalid client IP address format: %q", host)
  1593  				return false, nil
  1594  			}
  1595  			if subnetPrefix.Contains(ip) {
  1596  				return true, nil
  1597  			}
  1598  			return false, nil
  1599  		})
  1600  		if pollErr != nil {
  1601  			framework.Failf("Source IP WAS preserved even after ESIPP turned off. Got %v, expected a ten-dot cluster ip.", clientIP)
  1602  		}
  1603  
  1604  		// TODO: We need to attempt to create another service with the previously
  1605  		// allocated healthcheck nodePort. If the health check nodePort has been
  1606  		// freed, the new service creation will succeed, upon which we cleanup.
  1607  		// If the health check nodePort has NOT been freed, the new service
  1608  		// creation will fail.
  1609  
  1610  		ginkgo.By("setting ExternalTraffic field back to OnlyLocal")
  1611  		svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
  1612  			svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
  1613  			// Request the same healthCheckNodePort as before, to test the user-requested allocation path
  1614  			svc.Spec.HealthCheckNodePort = int32(healthCheckNodePort)
  1615  		})
  1616  		framework.ExpectNoError(err)
  1617  		loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs)
  1618  		pollErr = wait.PollImmediate(framework.PollShortTimeout, loadBalancerPropagationTimeout, func() (bool, error) {
  1619  			clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
  1620  			if err != nil {
  1621  				return false, nil
  1622  			}
  1623  			ginkgo.By(fmt.Sprintf("Endpoint %v:%v%v returned client ip %v", ingressIP, svcTCPPort, path, clientIPPort))
  1624  			// The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
  1625  			host, _, err := net.SplitHostPort(clientIPPort)
  1626  			if err != nil {
  1627  				framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
  1628  				return false, nil
  1629  			}
  1630  			ip := netutils.ParseIPSloppy(host)
  1631  			if ip == nil {
  1632  				framework.Logf("Invalid client IP address format: %q", host)
  1633  				return false, nil
  1634  			}
  1635  			if !subnetPrefix.Contains(ip) {
  1636  				return true, nil
  1637  			}
  1638  			return false, nil
  1639  		})
  1640  		if pollErr != nil {
  1641  			framework.Failf("Source IP (%v) is not the client IP even after ESIPP turned on, expected a public IP.", clientIP)
  1642  		}
  1643  	})
  1644  })
  1645  
  1646  func testRollingUpdateLBConnectivityDisruption(ctx context.Context, f *framework.Framework, externalTrafficPolicy v1.ServiceExternalTrafficPolicyType, minSuccessRate float64) {
  1647  	cs := f.ClientSet
  1648  	ns := f.Namespace.Name
  1649  	name := "test-lb-rolling-update"
  1650  	labels := map[string]string{"name": name}
  1651  	gracePeriod := int64(60)
  1652  	maxUnavailable := intstr.FromString("10%")
  1653  	ds := e2edaemonset.NewDaemonSet(name, e2eapps.AgnhostImage, labels, nil, nil,
  1654  		[]v1.ContainerPort{
  1655  			{ContainerPort: 80},
  1656  		},
  1657  		"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod),
  1658  	)
  1659  	ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{
  1660  		Type: appsv1.RollingUpdateDaemonSetStrategyType,
  1661  		RollingUpdate: &appsv1.RollingUpdateDaemonSet{
  1662  			MaxUnavailable: &maxUnavailable,
  1663  		},
  1664  	}
  1665  	ds.Spec.Template.Labels = labels
  1666  	ds.Spec.Template.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod)
  1667  
  1668  	nodeNames := e2edaemonset.SchedulableNodes(ctx, cs, ds)
  1669  	e2eskipper.SkipUnlessAtLeast(len(nodeNames), 2, "load-balancer rolling update test requires at least 2 schedulable nodes for the DaemonSet")
  1670  	if len(nodeNames) > 25 {
  1671  		e2eskipper.Skipf("load-balancer rolling update test skipped for large environments with more than 25 nodes")
  1672  	}
  1673  
  1674  	ginkgo.By(fmt.Sprintf("Creating DaemonSet %q", name))
  1675  	ds, err := cs.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
  1676  	framework.ExpectNoError(err)
  1677  
  1678  	ginkgo.By("Checking that daemon pods launch on every schedulable node of the cluster")
  1679  	creationTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
  1680  	err = wait.PollUntilContextTimeout(ctx, framework.Poll, creationTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, nodeNames))
  1681  	framework.ExpectNoError(err, "error waiting for daemon pods to start")
  1682  	err = e2edaemonset.CheckDaemonStatus(ctx, f, name)
  1683  	framework.ExpectNoError(err)
  1684  
  1685  	ginkgo.By(fmt.Sprintf("Creating a service %s with type=LoadBalancer externalTrafficPolicy=%s in namespace %s", name, externalTrafficPolicy, ns))
  1686  	jig := e2eservice.NewTestJig(cs, ns, name)
  1687  	jig.Labels = labels
  1688  	service, err := jig.CreateLoadBalancerService(ctx, creationTimeout, func(svc *v1.Service) {
  1689  		svc.Spec.ExternalTrafficPolicy = externalTrafficPolicy
  1690  	})
  1691  	framework.ExpectNoError(err)
  1692  
  1693  	lbNameOrAddress := e2eservice.GetIngressPoint(&service.Status.LoadBalancer.Ingress[0])
  1694  	svcPort := int(service.Spec.Ports[0].Port)
  1695  
  1696  	ginkgo.By("Hitting the DaemonSet's pods through the service's load balancer")
  1697  	timeout := e2eservice.LoadBalancerLagTimeoutDefault
  1698  	if framework.ProviderIs("aws") {
  1699  		timeout = e2eservice.LoadBalancerLagTimeoutAWS
  1700  	}
  1701  	e2eservice.TestReachableHTTP(ctx, lbNameOrAddress, svcPort, timeout)
  1702  
  1703  	ginkgo.By("Starting a goroutine to continuously hit the DaemonSet's pods through the service's load balancer")
  1704  	var totalRequests uint64 = 0
  1705  	var networkErrors uint64 = 0
  1706  	var httpErrors uint64 = 0
  1707  	done := make(chan struct{})
  1708  	defer close(done)
  1709  	go func() {
  1710  		defer ginkgo.GinkgoRecover()
  1711  
  1712  		wait.Until(func() {
  1713  			atomic.AddUint64(&totalRequests, 1)
  1714  			client := &http.Client{
  1715  				Transport: utilnet.SetTransportDefaults(&http.Transport{
  1716  					DisableKeepAlives: true,
  1717  				}),
  1718  				Timeout: 5 * time.Second,
  1719  			}
  1720  			ipPort := net.JoinHostPort(lbNameOrAddress, strconv.Itoa(svcPort))
  1721  			msg := "hello"
  1722  			url := fmt.Sprintf("http://%s/echo?msg=%s", ipPort, msg)
  1723  			resp, err := client.Get(url)
  1724  			if err != nil {
  1725  				framework.Logf("Got error testing for reachability of %s: %v", url, err)
  1726  				atomic.AddUint64(&networkErrors, 1)
  1727  				return
  1728  			}
  1729  			defer resp.Body.Close()
  1730  			if resp.StatusCode != http.StatusOK {
  1731  				framework.Logf("Got bad status code: %d", resp.StatusCode)
  1732  				atomic.AddUint64(&httpErrors, 1)
  1733  				return
  1734  			}
  1735  			body, err := io.ReadAll(resp.Body)
  1736  			if err != nil {
  1737  				framework.Logf("Got error reading HTTP body: %v", err)
  1738  				atomic.AddUint64(&httpErrors, 1)
  1739  				return
  1740  			}
  1741  			if string(body) != msg {
  1742  				framework.Logf("The response body does not contain expected string %s", string(body))
  1743  				atomic.AddUint64(&httpErrors, 1)
  1744  				return
  1745  			}
  1746  		}, time.Duration(0), done)
  1747  	}()
  1748  
  1749  	ginkgo.By("Triggering DaemonSet rolling update several times")
  1750  	var previousTotalRequests uint64 = 0
  1751  	var previousNetworkErrors uint64 = 0
  1752  	var previousHttpErrors uint64 = 0
  1753  	for i := 1; i <= 5; i++ {
  1754  		framework.Logf("Update daemon pods environment: [{\"name\":\"VERSION\",\"value\":\"%d\"}]", i)
  1755  		patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%d"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, i)
  1756  		ds, err = cs.AppsV1().DaemonSets(ns).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
  1757  		framework.ExpectNoError(err)
  1758  
  1759  		framework.Logf("Check that daemon pods are available on every node of the cluster with the updated environment.")
  1760  		err = wait.PollImmediate(framework.Poll, creationTimeout, func() (bool, error) {
  1761  			podList, err := cs.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{})
  1762  			if err != nil {
  1763  				return false, err
  1764  			}
  1765  			pods := podList.Items
  1766  
  1767  			readyPods := 0
  1768  			for _, pod := range pods {
  1769  				if !metav1.IsControlledBy(&pod, ds) {
  1770  					continue
  1771  				}
  1772  				if pod.DeletionTimestamp != nil {
  1773  					continue
  1774  				}
  1775  				podVersion := ""
  1776  				for _, env := range pod.Spec.Containers[0].Env {
  1777  					if env.Name == "VERSION" {
  1778  						podVersion = env.Value
  1779  						break
  1780  					}
  1781  				}
  1782  				if podVersion != fmt.Sprintf("%d", i) {
  1783  					continue
  1784  				}
  1785  				podReady := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now())
  1786  				if !podReady {
  1787  					continue
  1788  				}
  1789  				readyPods += 1
  1790  			}
  1791  			framework.Logf("Number of running nodes: %d, number of updated ready pods: %d in daemonset %s", len(nodeNames), readyPods, ds.Name)
  1792  			return readyPods == len(nodeNames), nil
  1793  		})
  1794  		framework.ExpectNoError(err, "error waiting for daemon pods to be ready")
  1795  
  1796  		// assert that the HTTP requests success rate is above the acceptable threshold after this rolling update
  1797  		currentTotalRequests := atomic.LoadUint64(&totalRequests)
  1798  		currentNetworkErrors := atomic.LoadUint64(&networkErrors)
  1799  		currentHttpErrors := atomic.LoadUint64(&httpErrors)
  1800  
  1801  		partialTotalRequests := currentTotalRequests - previousTotalRequests
  1802  		partialNetworkErrors := currentNetworkErrors - previousNetworkErrors
  1803  		partialHttpErrors := currentHttpErrors - previousHttpErrors
  1804  		partialSuccessRate := (float64(partialTotalRequests) - float64(partialNetworkErrors+partialHttpErrors)) / float64(partialTotalRequests)
  1805  
  1806  		framework.Logf("Load Balancer total HTTP requests: %d", partialTotalRequests)
  1807  		framework.Logf("Network errors: %d", partialNetworkErrors)
  1808  		framework.Logf("HTTP errors: %d", partialHttpErrors)
  1809  		framework.Logf("Success rate: %.2f%%", partialSuccessRate*100)
  1810  		if partialSuccessRate < minSuccessRate {
  1811  			framework.Failf("Encountered too many errors when doing HTTP requests to the load balancer address. Success rate is %.2f%%, and the minimum allowed threshold is %.2f%%.", partialSuccessRate*100, minSuccessRate*100)
  1812  		}
  1813  
  1814  		previousTotalRequests = currentTotalRequests
  1815  		previousNetworkErrors = currentNetworkErrors
  1816  		previousHttpErrors = currentHttpErrors
  1817  	}
  1818  
  1819  	// assert that the load balancer address is still reachable after the rolling updates are finished
  1820  	e2eservice.TestReachableHTTP(ctx, lbNameOrAddress, svcPort, timeout)
  1821  }
  1822  

View as plain text