...

Source file src/k8s.io/kubernetes/test/e2e/network/service.go

Documentation: k8s.io/kubernetes/test/e2e/network

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package network
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"errors"
    23  	"fmt"
    24  	"math/rand"
    25  	"net"
    26  	"net/http"
    27  	"sort"
    28  	"strconv"
    29  	"strings"
    30  	"time"
    31  
    32  	appsv1 "k8s.io/api/apps/v1"
    33  	v1 "k8s.io/api/core/v1"
    34  	discoveryv1 "k8s.io/api/discovery/v1"
    35  
    36  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    37  	"k8s.io/apimachinery/pkg/api/resource"
    38  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    39  	"k8s.io/apimachinery/pkg/labels"
    40  	"k8s.io/apimachinery/pkg/runtime/schema"
    41  	"k8s.io/apimachinery/pkg/types"
    42  	"k8s.io/apimachinery/pkg/util/intstr"
    43  	utilnet "k8s.io/apimachinery/pkg/util/net"
    44  	utilrand "k8s.io/apimachinery/pkg/util/rand"
    45  	"k8s.io/apimachinery/pkg/util/sets"
    46  	"k8s.io/apimachinery/pkg/util/wait"
    47  	watch "k8s.io/apimachinery/pkg/watch"
    48  	admissionapi "k8s.io/pod-security-admission/api"
    49  
    50  	clientset "k8s.io/client-go/kubernetes"
    51  	"k8s.io/client-go/tools/cache"
    52  	watchtools "k8s.io/client-go/tools/watch"
    53  	"k8s.io/client-go/util/retry"
    54  
    55  	cloudprovider "k8s.io/cloud-provider"
    56  	netutils "k8s.io/utils/net"
    57  	utilpointer "k8s.io/utils/pointer"
    58  
    59  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    60  	"k8s.io/kubernetes/test/e2e/framework"
    61  	e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
    62  	e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints"
    63  	e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice"
    64  	e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
    65  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    66  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    67  	e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
    68  	e2eproviders "k8s.io/kubernetes/test/e2e/framework/providers"
    69  	e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
    70  	e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
    71  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    72  	"k8s.io/kubernetes/test/e2e/network/common"
    73  	testutils "k8s.io/kubernetes/test/utils"
    74  	imageutils "k8s.io/kubernetes/test/utils/image"
    75  
    76  	"github.com/onsi/ginkgo/v2"
    77  	"github.com/onsi/gomega"
    78  )
    79  
    80  const (
    81  	defaultServeHostnameServicePort = 80
    82  	defaultServeHostnameServiceName = "svc-hostname"
    83  
    84  	// AffinityTimeout is the maximum time that CheckAffinity is allowed to take; this
    85  	// needs to be more than long enough for AffinityConfirmCount HTTP requests to
    86  	// complete in a busy CI cluster, but shouldn't be too long since we will end up
    87  	// waiting the entire time in the tests where affinity is not expected.
    88  	AffinityTimeout = 2 * time.Minute
    89  
    90  	// AffinityConfirmCount is the number of needed continuous requests to confirm that
    91  	// affinity is enabled.
    92  	AffinityConfirmCount = 15
    93  
    94  	// SessionAffinityTimeout is the number of seconds to wait between requests for
    95  	// session affinity to timeout before trying a load-balancer request again
    96  	SessionAffinityTimeout = 125
    97  
    98  	// label define which is used to find kube-proxy and kube-apiserver pod
    99  	kubeProxyLabelName     = "kube-proxy"
   100  	clusterAddonLabelKey   = "k8s-app"
   101  	kubeAPIServerLabelName = "kube-apiserver"
   102  	clusterComponentKey    = "component"
   103  
   104  	svcReadyTimeout = 1 * time.Minute
   105  )
   106  
   107  var (
   108  	defaultServeHostnameService = v1.Service{
   109  		ObjectMeta: metav1.ObjectMeta{
   110  			Name: defaultServeHostnameServiceName,
   111  		},
   112  		Spec: v1.ServiceSpec{
   113  			Ports: []v1.ServicePort{{
   114  				Port:       int32(defaultServeHostnameServicePort),
   115  				TargetPort: intstr.FromInt32(9376),
   116  				Protocol:   v1.ProtocolTCP,
   117  			}},
   118  			Selector: map[string]string{
   119  				"name": defaultServeHostnameServiceName,
   120  			},
   121  		},
   122  	}
   123  )
   124  
   125  // portsByPodName is a map that maps pod name to container ports.
   126  type portsByPodName map[string][]int
   127  
   128  // portsByPodUID is a map that maps pod name to container ports.
   129  type portsByPodUID map[types.UID][]int
   130  
   131  // fullPortsByPodName is a map that maps pod name to container ports including their protocols.
   132  type fullPortsByPodName map[string][]v1.ContainerPort
   133  
   134  // fullPortsByPodUID is a map that maps pod name to container ports.
   135  type fullPortsByPodUID map[types.UID][]v1.ContainerPort
   136  
   137  // affinityCheckFromPod returns interval, timeout and function pinging the service and
   138  // returning pinged hosts for pinging the service from execPod.
   139  func affinityCheckFromPod(execPod *v1.Pod, serviceIP string, servicePort int) (time.Duration, time.Duration, func() []string) {
   140  	timeout := AffinityTimeout
   141  	// interval considering a maximum of 2 seconds per connection
   142  	interval := 2 * AffinityConfirmCount * time.Second
   143  
   144  	serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
   145  	curl := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, serviceIPPort)
   146  	cmd := fmt.Sprintf("for i in $(seq 0 %d); do echo; %s ; done", AffinityConfirmCount, curl)
   147  	getHosts := func() []string {
   148  		stdout, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
   149  		if err != nil {
   150  			framework.Logf("Failed to get response from %s. Retry until timeout", serviceIPPort)
   151  			return nil
   152  		}
   153  		return strings.Split(stdout, "\n")
   154  	}
   155  
   156  	return interval, timeout, getHosts
   157  }
   158  
   159  // affinityCheckFromTest returns interval, timeout and function pinging the service and
   160  // returning pinged hosts for pinging the service from the test itself.
   161  func affinityCheckFromTest(ctx context.Context, cs clientset.Interface, serviceIP string, servicePort int) (time.Duration, time.Duration, func() []string) {
   162  	interval := 2 * time.Second
   163  	timeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs)
   164  
   165  	params := &e2enetwork.HTTPPokeParams{Timeout: 2 * time.Second}
   166  	getHosts := func() []string {
   167  		var hosts []string
   168  		for i := 0; i < AffinityConfirmCount; i++ {
   169  			result := e2enetwork.PokeHTTP(serviceIP, servicePort, "", params)
   170  			if result.Status == e2enetwork.HTTPSuccess {
   171  				hosts = append(hosts, string(result.Body))
   172  			}
   173  		}
   174  		return hosts
   175  	}
   176  
   177  	return interval, timeout, getHosts
   178  }
   179  
   180  // CheckAffinity function tests whether the service affinity works as expected.
   181  // If affinity is expected, the test will return true once affinityConfirmCount
   182  // number of same response observed in a row. If affinity is not expected, the
   183  // test will keep observe until different responses observed. The function will
   184  // return false only in case of unexpected errors.
   185  func checkAffinity(ctx context.Context, cs clientset.Interface, execPod *v1.Pod, serviceIP string, servicePort int, shouldHold bool) bool {
   186  	var interval, timeout time.Duration
   187  	var getHosts func() []string
   188  	if execPod != nil {
   189  		interval, timeout, getHosts = affinityCheckFromPod(execPod, serviceIP, servicePort)
   190  	} else {
   191  		interval, timeout, getHosts = affinityCheckFromTest(ctx, cs, serviceIP, servicePort)
   192  	}
   193  
   194  	var tracker affinityTracker
   195  	if pollErr := wait.PollImmediate(interval, timeout, func() (bool, error) {
   196  		hosts := getHosts()
   197  		for _, host := range hosts {
   198  			if len(host) > 0 {
   199  				tracker.recordHost(strings.TrimSpace(host))
   200  			}
   201  		}
   202  
   203  		trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount)
   204  		if !trackerFulfilled {
   205  			return false, nil
   206  		}
   207  
   208  		if !shouldHold && !affinityHolds {
   209  			return true, nil
   210  		}
   211  		if shouldHold && affinityHolds {
   212  			return true, nil
   213  		}
   214  		return false, nil
   215  	}); pollErr != nil {
   216  		trackerFulfilled, _ := tracker.checkHostTrace(AffinityConfirmCount)
   217  		if !wait.Interrupted(pollErr) {
   218  			checkAffinityFailed(tracker, pollErr.Error())
   219  			return false
   220  		}
   221  		if !trackerFulfilled {
   222  			checkAffinityFailed(tracker, fmt.Sprintf("Connection timed out or not enough responses."))
   223  		}
   224  		if shouldHold {
   225  			checkAffinityFailed(tracker, "Affinity should hold but didn't.")
   226  		} else {
   227  			checkAffinityFailed(tracker, "Affinity shouldn't hold but did.")
   228  		}
   229  		return true
   230  	}
   231  	return true
   232  }
   233  
   234  // affinityTracker tracks the destination of a request for the affinity tests.
   235  type affinityTracker struct {
   236  	hostTrace []string
   237  }
   238  
   239  // Record the response going to a given host.
   240  func (at *affinityTracker) recordHost(host string) {
   241  	at.hostTrace = append(at.hostTrace, host)
   242  	framework.Logf("Received response from host: %s", host)
   243  }
   244  
   245  // Check that we got a constant count requests going to the same host.
   246  func (at *affinityTracker) checkHostTrace(count int) (fulfilled, affinityHolds bool) {
   247  	fulfilled = (len(at.hostTrace) >= count)
   248  	if len(at.hostTrace) == 0 {
   249  		return fulfilled, true
   250  	}
   251  	last := at.hostTrace[0:]
   252  	if len(at.hostTrace)-count >= 0 {
   253  		last = at.hostTrace[len(at.hostTrace)-count:]
   254  	}
   255  	host := at.hostTrace[len(at.hostTrace)-1]
   256  	for _, h := range last {
   257  		if h != host {
   258  			return fulfilled, false
   259  		}
   260  	}
   261  	return fulfilled, true
   262  }
   263  
   264  func checkAffinityFailed(tracker affinityTracker, err string) {
   265  	framework.Logf("%v", tracker.hostTrace)
   266  	framework.Failf(err)
   267  }
   268  
   269  // StartServeHostnameService creates a replication controller that serves its
   270  // hostname and a service on top of it.
   271  func StartServeHostnameService(ctx context.Context, c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) {
   272  	podNames := make([]string, replicas)
   273  	name := svc.ObjectMeta.Name
   274  	ginkgo.By("creating service " + name + " in namespace " + ns)
   275  	_, err := c.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{})
   276  	if err != nil {
   277  		return podNames, "", err
   278  	}
   279  
   280  	var createdPods []*v1.Pod
   281  	maxContainerFailures := 0
   282  	config := testutils.RCConfig{
   283  		Client:               c,
   284  		Image:                framework.ServeHostnameImage,
   285  		Command:              []string{"/agnhost", "serve-hostname"},
   286  		Name:                 name,
   287  		Namespace:            ns,
   288  		PollInterval:         3 * time.Second,
   289  		Timeout:              framework.PodReadyBeforeTimeout,
   290  		Replicas:             replicas,
   291  		CreatedPods:          &createdPods,
   292  		MaxContainerFailures: &maxContainerFailures,
   293  	}
   294  	err = e2erc.RunRC(ctx, config)
   295  	if err != nil {
   296  		return podNames, "", err
   297  	}
   298  
   299  	if len(createdPods) != replicas {
   300  		return podNames, "", fmt.Errorf("incorrect number of running pods: %v", len(createdPods))
   301  	}
   302  
   303  	for i := range createdPods {
   304  		podNames[i] = createdPods[i].ObjectMeta.Name
   305  	}
   306  	sort.StringSlice(podNames).Sort()
   307  
   308  	service, err := c.CoreV1().Services(ns).Get(ctx, name, metav1.GetOptions{})
   309  	if err != nil {
   310  		return podNames, "", err
   311  	}
   312  	if service.Spec.ClusterIP == "" {
   313  		return podNames, "", fmt.Errorf("service IP is blank for %v", name)
   314  	}
   315  	serviceIP := service.Spec.ClusterIP
   316  	return podNames, serviceIP, nil
   317  }
   318  
   319  // StopServeHostnameService stops the given service.
   320  func StopServeHostnameService(ctx context.Context, clientset clientset.Interface, ns, name string) error {
   321  	if err := e2erc.DeleteRCAndWaitForGC(ctx, clientset, ns, name); err != nil {
   322  		return err
   323  	}
   324  	if err := clientset.CoreV1().Services(ns).Delete(ctx, name, metav1.DeleteOptions{}); err != nil {
   325  		return err
   326  	}
   327  	return nil
   328  }
   329  
   330  // verifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the
   331  // host exec pod of host network type and from the exec pod of container network type.
   332  // Each pod in the service is expected to echo its name. These names are compared with the
   333  // given expectedPods list after a sort | uniq.
   334  func verifyServeHostnameServiceUp(ctx context.Context, c clientset.Interface, ns string, expectedPods []string, serviceIP string, servicePort int) error {
   335  	// to verify from host network
   336  	hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-up-host-exec-pod")
   337  
   338  	// to verify from container's network
   339  	execPod := e2epod.CreateExecPodOrFail(ctx, c, ns, "verify-service-up-exec-pod-", nil)
   340  	defer func() {
   341  		e2epod.DeletePodOrFail(ctx, c, ns, hostExecPod.Name)
   342  		e2epod.DeletePodOrFail(ctx, c, ns, execPod.Name)
   343  	}()
   344  
   345  	// verify service from pod
   346  	cmdFunc := func(podName string) string {
   347  		wgetCmd := "wget -q -O -"
   348  		// Command 'wget' in Windows image may not support option 'T'
   349  		if !framework.NodeOSDistroIs("windows") {
   350  			wgetCmd += " -T 1"
   351  		}
   352  		serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
   353  		cmd := fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s 2>&1 || true; echo; done",
   354  			50*len(expectedPods), wgetCmd, serviceIPPort)
   355  		framework.Logf("Executing cmd %q in pod %v/%v", cmd, ns, podName)
   356  		// TODO: Use exec-over-http via the netexec pod instead of kubectl exec.
   357  		output, err := e2eoutput.RunHostCmd(ns, podName, cmd)
   358  		if err != nil {
   359  			framework.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, podName, err, output)
   360  		}
   361  		return output
   362  	}
   363  
   364  	expectedEndpoints := sets.NewString(expectedPods...)
   365  	ginkgo.By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
   366  	for _, podName := range []string{hostExecPod.Name, execPod.Name} {
   367  		passed := false
   368  		gotEndpoints := sets.NewString()
   369  
   370  		// Retry cmdFunc for a while
   371  		for start := time.Now(); time.Since(start) < e2eservice.KubeProxyLagTimeout; time.Sleep(5 * time.Second) {
   372  			for _, endpoint := range strings.Split(cmdFunc(podName), "\n") {
   373  				trimmedEp := strings.TrimSpace(endpoint)
   374  				if trimmedEp != "" {
   375  					gotEndpoints.Insert(trimmedEp)
   376  				}
   377  			}
   378  			// TODO: simply checking that the retrieved endpoints is a superset
   379  			// of the expected allows us to ignore intermitten network flakes that
   380  			// result in output like "wget timed out", but these should be rare
   381  			// and we need a better way to track how often it occurs.
   382  			if gotEndpoints.IsSuperset(expectedEndpoints) {
   383  				if !gotEndpoints.Equal(expectedEndpoints) {
   384  					framework.Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints))
   385  				}
   386  				passed = true
   387  				break
   388  			}
   389  			framework.Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints))
   390  		}
   391  		if !passed {
   392  			// Sort the lists so they're easier to visually diff.
   393  			exp := expectedEndpoints.List()
   394  			got := gotEndpoints.List()
   395  			sort.StringSlice(exp).Sort()
   396  			sort.StringSlice(got).Sort()
   397  			return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got)
   398  		}
   399  	}
   400  	return nil
   401  }
   402  
   403  // verifyServeHostnameServiceDown verifies that the given service isn't served.
   404  func verifyServeHostnameServiceDown(ctx context.Context, c clientset.Interface, ns string, serviceIP string, servicePort int) error {
   405  	// verify from host network
   406  	hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-down-host-exec-pod")
   407  	defer func() {
   408  		e2epod.DeletePodOrFail(ctx, c, ns, hostExecPod.Name)
   409  	}()
   410  
   411  	ipPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
   412  	// The current versions of curl included in CentOS and RHEL distros
   413  	// misinterpret square brackets around IPv6 as globbing, so use the -g
   414  	// argument to disable globbing to handle the IPv6 case.
   415  	command := fmt.Sprintf(
   416  		"curl -g -s --connect-timeout 2 http://%s && echo service-down-failed", ipPort)
   417  
   418  	for start := time.Now(); time.Since(start) < e2eservice.KubeProxyLagTimeout; time.Sleep(5 * time.Second) {
   419  		output, err := e2eoutput.RunHostCmd(ns, hostExecPod.Name, command)
   420  		if err != nil {
   421  			framework.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", command, ns, hostExecPod.Name, err, output)
   422  		}
   423  		if !strings.Contains(output, "service-down-failed") {
   424  			return nil
   425  		}
   426  		framework.Logf("service still alive - still waiting")
   427  	}
   428  
   429  	return fmt.Errorf("waiting for service to be down timed out")
   430  }
   431  
   432  // testNotReachableHTTP tests that a HTTP request doesn't connect to the given host and port.
   433  func testNotReachableHTTP(host string, port int, timeout time.Duration) {
   434  	pollfn := func() (bool, error) {
   435  		result := e2enetwork.PokeHTTP(host, port, "/", nil)
   436  		if result.Code == 0 {
   437  			return true, nil
   438  		}
   439  		return false, nil // caller can retry
   440  	}
   441  
   442  	if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
   443  		framework.Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err)
   444  	}
   445  }
   446  
   447  // testRejectedHTTP tests that the given host rejects a HTTP request on the given port.
   448  func testRejectedHTTP(host string, port int, timeout time.Duration) {
   449  	pollfn := func() (bool, error) {
   450  		result := e2enetwork.PokeHTTP(host, port, "/", nil)
   451  		if result.Status == e2enetwork.HTTPRefused {
   452  			return true, nil
   453  		}
   454  		return false, nil // caller can retry
   455  	}
   456  
   457  	if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
   458  		framework.Failf("HTTP service %v:%v not rejected: %v", host, port, err)
   459  	}
   460  }
   461  
   462  // UDPPokeParams is a struct for UDP poke parameters.
   463  type UDPPokeParams struct {
   464  	Timeout  time.Duration
   465  	Response string
   466  }
   467  
   468  // UDPPokeResult is a struct for UDP poke result.
   469  type UDPPokeResult struct {
   470  	Status   UDPPokeStatus
   471  	Error    error  // if there was any error
   472  	Response []byte // if code != 0
   473  }
   474  
   475  // UDPPokeStatus is string for representing UDP poke status.
   476  type UDPPokeStatus string
   477  
   478  const (
   479  	// UDPSuccess is UDP poke status which is success.
   480  	UDPSuccess UDPPokeStatus = "Success"
   481  	// UDPError is UDP poke status which is error.
   482  	UDPError UDPPokeStatus = "UnknownError"
   483  	// UDPTimeout is UDP poke status which is timeout.
   484  	UDPTimeout UDPPokeStatus = "TimedOut"
   485  	// UDPRefused is UDP poke status which is connection refused.
   486  	UDPRefused UDPPokeStatus = "ConnectionRefused"
   487  	// UDPBadResponse is UDP poke status which is bad response.
   488  	UDPBadResponse UDPPokeStatus = "BadResponse"
   489  	// Any time we add new errors, we should audit all callers of this.
   490  )
   491  
   492  // pokeUDP tries to connect to a host on a port and send the given request. Callers
   493  // can specify additional success parameters, if desired.
   494  //
   495  // The result status will be characterized as precisely as possible, given the
   496  // known users of this.
   497  //
   498  // The result error will be populated for any status other than Success.
   499  //
   500  // The result response will be populated if the UDP transaction was completed, even
   501  // if the other test params make this a failure).
   502  func pokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPokeResult {
   503  	hostPort := net.JoinHostPort(host, strconv.Itoa(port))
   504  	url := fmt.Sprintf("udp://%s", hostPort)
   505  
   506  	ret := UDPPokeResult{}
   507  
   508  	// Sanity check inputs, because it has happened.  These are the only things
   509  	// that should hard fail the test - they are basically ASSERT()s.
   510  	if host == "" {
   511  		framework.Failf("Got empty host for UDP poke (%s)", url)
   512  		return ret
   513  	}
   514  	if port == 0 {
   515  		framework.Failf("Got port==0 for UDP poke (%s)", url)
   516  		return ret
   517  	}
   518  
   519  	// Set default params.
   520  	if params == nil {
   521  		params = &UDPPokeParams{}
   522  	}
   523  
   524  	framework.Logf("Poking %v", url)
   525  
   526  	con, err := net.Dial("udp", hostPort)
   527  	if err != nil {
   528  		ret.Status = UDPError
   529  		ret.Error = err
   530  		framework.Logf("Poke(%q): %v", url, err)
   531  		return ret
   532  	}
   533  
   534  	_, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
   535  	if err != nil {
   536  		ret.Error = err
   537  		neterr, ok := err.(net.Error)
   538  		if ok && neterr.Timeout() {
   539  			ret.Status = UDPTimeout
   540  		} else if strings.Contains(err.Error(), "connection refused") {
   541  			ret.Status = UDPRefused
   542  		} else {
   543  			ret.Status = UDPError
   544  		}
   545  		framework.Logf("Poke(%q): %v", url, err)
   546  		return ret
   547  	}
   548  
   549  	if params.Timeout != 0 {
   550  		err = con.SetDeadline(time.Now().Add(params.Timeout))
   551  		if err != nil {
   552  			ret.Status = UDPError
   553  			ret.Error = err
   554  			framework.Logf("Poke(%q): %v", url, err)
   555  			return ret
   556  		}
   557  	}
   558  
   559  	bufsize := len(params.Response) + 1
   560  	if bufsize == 0 {
   561  		bufsize = 4096
   562  	}
   563  	var buf = make([]byte, bufsize)
   564  	n, err := con.Read(buf)
   565  	if err != nil {
   566  		ret.Error = err
   567  		neterr, ok := err.(net.Error)
   568  		if ok && neterr.Timeout() {
   569  			ret.Status = UDPTimeout
   570  		} else if strings.Contains(err.Error(), "connection refused") {
   571  			ret.Status = UDPRefused
   572  		} else {
   573  			ret.Status = UDPError
   574  		}
   575  		framework.Logf("Poke(%q): %v", url, err)
   576  		return ret
   577  	}
   578  	ret.Response = buf[0:n]
   579  
   580  	if params.Response != "" && string(ret.Response) != params.Response {
   581  		ret.Status = UDPBadResponse
   582  		ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response))
   583  		framework.Logf("Poke(%q): %v", url, ret.Error)
   584  		return ret
   585  	}
   586  
   587  	ret.Status = UDPSuccess
   588  	framework.Logf("Poke(%q): success", url)
   589  	return ret
   590  }
   591  
   592  // testReachableUDP tests that the given host serves UDP on the given port.
   593  func testReachableUDP(host string, port int, timeout time.Duration) {
   594  	pollfn := func() (bool, error) {
   595  		result := pokeUDP(host, port, "echo hello", &UDPPokeParams{
   596  			Timeout:  3 * time.Second,
   597  			Response: "hello",
   598  		})
   599  		if result.Status == UDPSuccess {
   600  			return true, nil
   601  		}
   602  		return false, nil // caller can retry
   603  	}
   604  
   605  	if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
   606  		framework.Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
   607  	}
   608  }
   609  
   610  // testNotReachableUDP tests that the given host doesn't serve UDP on the given port.
   611  func testNotReachableUDP(host string, port int, timeout time.Duration) {
   612  	pollfn := func() (bool, error) {
   613  		result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
   614  		if result.Status != UDPSuccess && result.Status != UDPError {
   615  			return true, nil
   616  		}
   617  		return false, nil // caller can retry
   618  	}
   619  	if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
   620  		framework.Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err)
   621  	}
   622  }
   623  
   624  // testRejectedUDP tests that the given host rejects a UDP request on the given port.
   625  func testRejectedUDP(host string, port int, timeout time.Duration) {
   626  	pollfn := func() (bool, error) {
   627  		result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
   628  		if result.Status == UDPRefused {
   629  			return true, nil
   630  		}
   631  		return false, nil // caller can retry
   632  	}
   633  	if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
   634  		framework.Failf("UDP service %v:%v not rejected: %v", host, port, err)
   635  	}
   636  }
   637  
   638  // TestHTTPHealthCheckNodePort tests a HTTP connection by the given request to the given host and port.
   639  func TestHTTPHealthCheckNodePort(host string, port int, request string, timeout time.Duration, expectSucceed bool, threshold int) error {
   640  	count := 0
   641  	condition := func() (bool, error) {
   642  		success, _ := testHTTPHealthCheckNodePort(host, port, request)
   643  		if success && expectSucceed ||
   644  			!success && !expectSucceed {
   645  			count++
   646  		}
   647  		if count >= threshold {
   648  			return true, nil
   649  		}
   650  		return false, nil
   651  	}
   652  
   653  	if err := wait.PollImmediate(time.Second, timeout, condition); err != nil {
   654  		return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v%v, got %d", threshold, expectSucceed, host, port, count)
   655  	}
   656  	return nil
   657  }
   658  
   659  func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
   660  	ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
   661  	url := fmt.Sprintf("http://%s%s", ipPort, request)
   662  	if ip == "" || port == 0 {
   663  		framework.Failf("Got empty IP for reachability check (%s)", url)
   664  		return false, fmt.Errorf("invalid input ip or port")
   665  	}
   666  	framework.Logf("Testing HTTP health check on %v", url)
   667  	resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second)
   668  	if err != nil {
   669  		framework.Logf("Got error testing for reachability of %s: %v", url, err)
   670  		return false, err
   671  	}
   672  	defer resp.Body.Close()
   673  	if err != nil {
   674  		framework.Logf("Got error reading response from %s: %v", url, err)
   675  		return false, err
   676  	}
   677  	// HealthCheck responder returns 503 for no local endpoints
   678  	if resp.StatusCode == 503 {
   679  		return false, nil
   680  	}
   681  	// HealthCheck responder returns 200 for non-zero local endpoints
   682  	if resp.StatusCode == 200 {
   683  		return true, nil
   684  	}
   685  	return false, fmt.Errorf("unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
   686  }
   687  
   688  func testHTTPHealthCheckNodePortFromTestContainer(ctx context.Context, config *e2enetwork.NetworkingTestConfig, host string, port int, timeout time.Duration, expectSucceed bool, threshold int) error {
   689  	count := 0
   690  	pollFn := func() (bool, error) {
   691  		statusCode, err := config.GetHTTPCodeFromTestContainer(ctx,
   692  			"/healthz",
   693  			host,
   694  			port)
   695  		if err != nil {
   696  			framework.Logf("Got error reading status code from http://%s:%d/healthz via test container: %v", host, port, err)
   697  			return false, nil
   698  		}
   699  		framework.Logf("Got status code from http://%s:%d/healthz via test container: %d", host, port, statusCode)
   700  		success := statusCode == 200
   701  		if (success && expectSucceed) ||
   702  			(!success && !expectSucceed) {
   703  			count++
   704  		}
   705  		return count >= threshold, nil
   706  	}
   707  	err := wait.PollImmediate(time.Second, timeout, pollFn)
   708  	if err != nil {
   709  		return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v:%v/healthz, got %d", threshold, expectSucceed, host, port, count)
   710  	}
   711  	return nil
   712  }
   713  
   714  // Does an HTTP GET, but does not reuse TCP connections
   715  // This masks problems where the iptables rule has changed, but we don't see it
   716  func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
   717  	tr := utilnet.SetTransportDefaults(&http.Transport{
   718  		DisableKeepAlives: true,
   719  	})
   720  	client := &http.Client{
   721  		Transport: tr,
   722  		Timeout:   timeout,
   723  	}
   724  	return client.Get(url)
   725  }
   726  
   727  func getServeHostnameService(name string) *v1.Service {
   728  	svc := defaultServeHostnameService.DeepCopy()
   729  	svc.ObjectMeta.Name = name
   730  	svc.Spec.Selector["name"] = name
   731  	return svc
   732  }
   733  
   734  // waitForAPIServerUp waits for the kube-apiserver to be up.
   735  func waitForAPIServerUp(ctx context.Context, c clientset.Interface) error {
   736  	for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
   737  		body, err := c.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(ctx).Raw()
   738  		if err == nil && string(body) == "ok" {
   739  			return nil
   740  		}
   741  	}
   742  	return fmt.Errorf("waiting for apiserver timed out")
   743  }
   744  
   745  // getEndpointNodesWithInternalIP returns a map of nodenames:internal-ip on which the
   746  // endpoints of the Service are running.
   747  func getEndpointNodesWithInternalIP(ctx context.Context, jig *e2eservice.TestJig) (map[string]string, error) {
   748  	nodesWithIPs, err := jig.GetEndpointNodesWithIP(ctx, v1.NodeInternalIP)
   749  	if err != nil {
   750  		return nil, err
   751  	}
   752  	endpointsNodeMap := make(map[string]string, len(nodesWithIPs))
   753  	for nodeName, internalIPs := range nodesWithIPs {
   754  		if len(internalIPs) < 1 {
   755  			return nil, fmt.Errorf("no internal ip found for node %s", nodeName)
   756  		}
   757  		endpointsNodeMap[nodeName] = internalIPs[0]
   758  	}
   759  	return endpointsNodeMap, nil
   760  }
   761  
   762  var _ = common.SIGDescribe("Services", func() {
   763  	f := framework.NewDefaultFramework("services")
   764  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
   765  
   766  	var cs clientset.Interface
   767  
   768  	ginkgo.BeforeEach(func() {
   769  		cs = f.ClientSet
   770  	})
   771  
   772  	// TODO: We get coverage of TCP/UDP and multi-port services through the DNS test. We should have a simpler test for multi-port TCP here.
   773  
   774  	/*
   775  		Release: v1.9
   776  		Testname: Kubernetes Service
   777  		Description: By default when a kubernetes cluster is running there MUST be a 'kubernetes' service running in the cluster.
   778  	*/
   779  	framework.ConformanceIt("should provide secure master service", func(ctx context.Context) {
   780  		_, err := cs.CoreV1().Services(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{})
   781  		framework.ExpectNoError(err, "failed to fetch the service object for the service named kubernetes")
   782  	})
   783  
   784  	/*
   785  		Release: v1.9
   786  		Testname: Service, endpoints
   787  		Description: Create a service with a endpoint without any Pods, the service MUST run and show empty endpoints. Add a pod to the service and the service MUST validate to show all the endpoints for the ports exposed by the Pod. Add another Pod then the list of all Ports exposed by both the Pods MUST be valid and have corresponding service endpoint. Once the second Pod is deleted then set of endpoint MUST be validated to show only ports from the first container that are exposed. Once both pods are deleted the endpoints from the service MUST be empty.
   788  	*/
   789  	framework.ConformanceIt("should serve a basic endpoint from pods", func(ctx context.Context) {
   790  		serviceName := "endpoint-test2"
   791  		ns := f.Namespace.Name
   792  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
   793  
   794  		ginkgo.By("creating service " + serviceName + " in namespace " + ns)
   795  		ginkgo.DeferCleanup(func(ctx context.Context) {
   796  			err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
   797  			framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
   798  		})
   799  		svc, err := jig.CreateTCPServiceWithPort(ctx, nil, 80)
   800  		framework.ExpectNoError(err)
   801  
   802  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
   803  
   804  		names := map[string]bool{}
   805  		ginkgo.DeferCleanup(func(ctx context.Context) {
   806  			for name := range names {
   807  				err := cs.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
   808  				framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", name, ns)
   809  			}
   810  		})
   811  
   812  		name1 := "pod1"
   813  		name2 := "pod2"
   814  
   815  		createPodOrFail(ctx, f, ns, name1, jig.Labels, []v1.ContainerPort{{ContainerPort: 80}}, "netexec", "--http-port", "80")
   816  		names[name1] = true
   817  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{name1: {80}})
   818  
   819  		ginkgo.By("Checking if the Service forwards traffic to pod1")
   820  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
   821  		err = jig.CheckServiceReachability(ctx, svc, execPod)
   822  		framework.ExpectNoError(err)
   823  
   824  		createPodOrFail(ctx, f, ns, name2, jig.Labels, []v1.ContainerPort{{ContainerPort: 80}}, "netexec", "--http-port", "80")
   825  		names[name2] = true
   826  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{name1: {80}, name2: {80}})
   827  
   828  		ginkgo.By("Checking if the Service forwards traffic to pod1 and pod2")
   829  		err = jig.CheckServiceReachability(ctx, svc, execPod)
   830  		framework.ExpectNoError(err)
   831  
   832  		e2epod.DeletePodOrFail(ctx, cs, ns, name1)
   833  		delete(names, name1)
   834  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{name2: {80}})
   835  
   836  		ginkgo.By("Checking if the Service forwards traffic to pod2")
   837  		err = jig.CheckServiceReachability(ctx, svc, execPod)
   838  		framework.ExpectNoError(err)
   839  
   840  		e2epod.DeletePodOrFail(ctx, cs, ns, name2)
   841  		delete(names, name2)
   842  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
   843  	})
   844  
   845  	/*
   846  		Release: v1.9
   847  		Testname: Service, endpoints with multiple ports
   848  		Description: Create a service with two ports but no Pods are added to the service yet.  The service MUST run and show empty set of endpoints. Add a Pod to the first port, service MUST list one endpoint for the Pod on that port. Add another Pod to the second port, service MUST list both the endpoints. Delete the first Pod and the service MUST list only the endpoint to the second Pod. Delete the second Pod and the service must now have empty set of endpoints.
   849  	*/
   850  	framework.ConformanceIt("should serve multiport endpoints from pods", func(ctx context.Context) {
   851  		// repacking functionality is intentionally not tested here - it's better to test it in an integration test.
   852  		serviceName := "multi-endpoint-test"
   853  		ns := f.Namespace.Name
   854  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
   855  
   856  		ginkgo.DeferCleanup(func(ctx context.Context) {
   857  			err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
   858  			framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
   859  		})
   860  
   861  		svc1port := "svc1"
   862  		svc2port := "svc2"
   863  
   864  		ginkgo.By("creating service " + serviceName + " in namespace " + ns)
   865  		svc, err := jig.CreateTCPService(ctx, func(service *v1.Service) {
   866  			service.Spec.Ports = []v1.ServicePort{
   867  				{
   868  					Name:       "portname1",
   869  					Port:       80,
   870  					TargetPort: intstr.FromString(svc1port),
   871  				},
   872  				{
   873  					Name:       "portname2",
   874  					Port:       81,
   875  					TargetPort: intstr.FromString(svc2port),
   876  				},
   877  			}
   878  		})
   879  		framework.ExpectNoError(err)
   880  
   881  		port1 := 100
   882  		port2 := 101
   883  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
   884  
   885  		names := map[string]bool{}
   886  		ginkgo.DeferCleanup(func(ctx context.Context) {
   887  			for name := range names {
   888  				err := cs.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
   889  				framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", name, ns)
   890  			}
   891  		})
   892  
   893  		containerPorts1 := []v1.ContainerPort{
   894  			{
   895  				Name:          svc1port,
   896  				ContainerPort: int32(port1),
   897  			},
   898  		}
   899  		containerPorts2 := []v1.ContainerPort{
   900  			{
   901  				Name:          svc2port,
   902  				ContainerPort: int32(port2),
   903  			},
   904  		}
   905  
   906  		podname1 := "pod1"
   907  		podname2 := "pod2"
   908  
   909  		createPodOrFail(ctx, f, ns, podname1, jig.Labels, containerPorts1, "netexec", "--http-port", strconv.Itoa(port1))
   910  		names[podname1] = true
   911  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname1: {port1}})
   912  
   913  		createPodOrFail(ctx, f, ns, podname2, jig.Labels, containerPorts2, "netexec", "--http-port", strconv.Itoa(port2))
   914  		names[podname2] = true
   915  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname1: {port1}, podname2: {port2}})
   916  
   917  		ginkgo.By("Checking if the Service forwards traffic to pods")
   918  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
   919  		err = jig.CheckServiceReachability(ctx, svc, execPod)
   920  		framework.ExpectNoError(err)
   921  
   922  		e2epod.DeletePodOrFail(ctx, cs, ns, podname1)
   923  		delete(names, podname1)
   924  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname2: {port2}})
   925  
   926  		e2epod.DeletePodOrFail(ctx, cs, ns, podname2)
   927  		delete(names, podname2)
   928  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
   929  	})
   930  
   931  	ginkgo.It("should be updated after adding or deleting ports ", func(ctx context.Context) {
   932  		serviceName := "edit-port-test"
   933  		ns := f.Namespace.Name
   934  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
   935  
   936  		svc1port := "svc1"
   937  		ginkgo.By("creating service " + serviceName + " in namespace " + ns)
   938  		svc, err := jig.CreateTCPService(ctx, func(service *v1.Service) {
   939  			service.Spec.Ports = []v1.ServicePort{
   940  				{
   941  					Name:       "portname1",
   942  					Port:       80,
   943  					TargetPort: intstr.FromString(svc1port),
   944  				},
   945  			}
   946  		})
   947  		framework.ExpectNoError(err)
   948  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
   949  
   950  		podname1 := "pod1"
   951  		port1 := 100
   952  		containerPorts1 := []v1.ContainerPort{
   953  			{
   954  				Name:          svc1port,
   955  				ContainerPort: int32(port1),
   956  			},
   957  		}
   958  		createPodOrFail(ctx, f, ns, podname1, jig.Labels, containerPorts1, "netexec", "--http-port", strconv.Itoa(port1))
   959  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname1: {port1}})
   960  
   961  		ginkgo.By("Checking if the Service " + serviceName + " forwards traffic to " + podname1)
   962  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
   963  		err = jig.CheckServiceReachability(ctx, svc, execPod)
   964  		framework.ExpectNoError(err)
   965  
   966  		ginkgo.By("Adding a new port to service " + serviceName)
   967  		svc2port := "svc2"
   968  		svc, err = jig.UpdateService(ctx, func(s *v1.Service) {
   969  			s.Spec.Ports = []v1.ServicePort{
   970  				{
   971  					Name:       "portname1",
   972  					Port:       80,
   973  					TargetPort: intstr.FromString(svc1port),
   974  				},
   975  				{
   976  					Name:       "portname2",
   977  					Port:       81,
   978  					TargetPort: intstr.FromString(svc2port),
   979  				},
   980  			}
   981  		})
   982  		framework.ExpectNoError(err)
   983  
   984  		ginkgo.By("Adding a new endpoint to the new port ")
   985  		podname2 := "pod2"
   986  		port2 := 101
   987  		containerPorts2 := []v1.ContainerPort{
   988  			{
   989  				Name:          svc2port,
   990  				ContainerPort: int32(port2),
   991  			},
   992  		}
   993  		createPodOrFail(ctx, f, ns, podname2, jig.Labels, containerPorts2, "netexec", "--http-port", strconv.Itoa(port2))
   994  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname1: {port1}, podname2: {port2}})
   995  
   996  		ginkgo.By("Checking if the Service forwards traffic to " + podname1 + " and " + podname2)
   997  		err = jig.CheckServiceReachability(ctx, svc, execPod)
   998  		framework.ExpectNoError(err)
   999  
  1000  		ginkgo.By("Deleting a port from service " + serviceName)
  1001  		svc, err = jig.UpdateService(ctx, func(s *v1.Service) {
  1002  			s.Spec.Ports = []v1.ServicePort{
  1003  				{
  1004  					Name:       "portname1",
  1005  					Port:       80,
  1006  					TargetPort: intstr.FromString(svc1port),
  1007  				},
  1008  			}
  1009  		})
  1010  		framework.ExpectNoError(err)
  1011  
  1012  		ginkgo.By("Checking if the Service forwards traffic to " + podname1 + " and not forwards to " + podname2)
  1013  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname1: {port1}})
  1014  		err = jig.CheckServiceReachability(ctx, svc, execPod)
  1015  		framework.ExpectNoError(err)
  1016  	})
  1017  
  1018  	ginkgo.It("should preserve source pod IP for traffic thru service cluster IP [LinuxOnly]", func(ctx context.Context) {
  1019  		// this test is creating a pod with HostNetwork=true, which is not supported on Windows.
  1020  		e2eskipper.SkipIfNodeOSDistroIs("windows")
  1021  
  1022  		serviceName := "sourceip-test"
  1023  		ns := f.Namespace.Name
  1024  
  1025  		ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns)
  1026  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  1027  		jig.ExternalIPs = false
  1028  		servicePort := 8080
  1029  		tcpService, err := jig.CreateTCPServiceWithPort(ctx, nil, int32(servicePort))
  1030  		framework.ExpectNoError(err)
  1031  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1032  			framework.Logf("Cleaning up the sourceip test service")
  1033  			err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
  1034  			framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
  1035  		})
  1036  		serviceIP := tcpService.Spec.ClusterIP
  1037  		framework.Logf("sourceip-test cluster ip: %s", serviceIP)
  1038  
  1039  		ginkgo.By("Picking 2 Nodes to test whether source IP is preserved or not")
  1040  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  1041  		framework.ExpectNoError(err)
  1042  		nodeCounts := len(nodes.Items)
  1043  		if nodeCounts < 2 {
  1044  			e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
  1045  		}
  1046  
  1047  		ginkgo.By("Creating a webserver pod to be part of the TCP service which echoes back source ip")
  1048  		serverPodName := "echo-sourceip"
  1049  		pod := e2epod.NewAgnhostPod(ns, serverPodName, nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort))
  1050  		pod.Labels = jig.Labels
  1051  		_, err = cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
  1052  		framework.ExpectNoError(err)
  1053  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout))
  1054  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1055  			framework.Logf("Cleaning up the echo server pod")
  1056  			err := cs.CoreV1().Pods(ns).Delete(ctx, serverPodName, metav1.DeleteOptions{})
  1057  			framework.ExpectNoError(err, "failed to delete pod: %s on node", serverPodName)
  1058  		})
  1059  
  1060  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{serverPodName: {servicePort}})
  1061  
  1062  		ginkgo.By("Creating pause pod deployment")
  1063  		deployment := createPausePodDeployment(ctx, cs, "pause-pod", ns, nodeCounts)
  1064  
  1065  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1066  			framework.Logf("Deleting deployment")
  1067  			err = cs.AppsV1().Deployments(ns).Delete(ctx, deployment.Name, metav1.DeleteOptions{})
  1068  			framework.ExpectNoError(err, "Failed to delete deployment %s", deployment.Name)
  1069  		})
  1070  
  1071  		framework.ExpectNoError(e2edeployment.WaitForDeploymentComplete(cs, deployment), "Failed to complete pause pod deployment")
  1072  
  1073  		deployment, err = cs.AppsV1().Deployments(ns).Get(ctx, deployment.Name, metav1.GetOptions{})
  1074  		framework.ExpectNoError(err, "Error in retrieving pause pod deployment")
  1075  		labelSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
  1076  
  1077  		pausePods, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{LabelSelector: labelSelector.String()})
  1078  		framework.ExpectNoError(err, "Error in listing pods associated with pause pod deployments")
  1079  
  1080  		gomega.Expect(pausePods.Items[0].Spec.NodeName).NotTo(gomega.Equal(pausePods.Items[1].Spec.NodeName))
  1081  
  1082  		serviceAddress := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
  1083  
  1084  		for _, pausePod := range pausePods.Items {
  1085  			sourceIP, execPodIP := execSourceIPTest(pausePod, serviceAddress)
  1086  			ginkgo.By("Verifying the preserved source ip")
  1087  			gomega.Expect(sourceIP).To(gomega.Equal(execPodIP))
  1088  		}
  1089  	})
  1090  
  1091  	ginkgo.It("should allow pods to hairpin back to themselves through services", func(ctx context.Context) {
  1092  		serviceName := "hairpin-test"
  1093  		ns := f.Namespace.Name
  1094  
  1095  		ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns)
  1096  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  1097  		jig.ExternalIPs = false
  1098  		servicePort := 8080
  1099  		svc, err := jig.CreateTCPServiceWithPort(ctx, nil, int32(servicePort))
  1100  		framework.ExpectNoError(err)
  1101  		serviceIP := svc.Spec.ClusterIP
  1102  		framework.Logf("hairpin-test cluster ip: %s", serviceIP)
  1103  
  1104  		ginkgo.By("creating a client/server pod")
  1105  		serverPodName := "hairpin"
  1106  		podTemplate := e2epod.NewAgnhostPod(ns, serverPodName, nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort))
  1107  		podTemplate.Labels = jig.Labels
  1108  		pod, err := cs.CoreV1().Pods(ns).Create(ctx, podTemplate, metav1.CreateOptions{})
  1109  		framework.ExpectNoError(err)
  1110  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout))
  1111  
  1112  		ginkgo.By("waiting for the service to expose an endpoint")
  1113  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{serverPodName: {servicePort}})
  1114  
  1115  		ginkgo.By("Checking if the pod can reach itself")
  1116  		err = jig.CheckServiceReachability(ctx, svc, pod)
  1117  		framework.ExpectNoError(err)
  1118  	})
  1119  
  1120  	ginkgo.It("should be able to up and down services", func(ctx context.Context) {
  1121  		ns := f.Namespace.Name
  1122  		numPods, servicePort := 3, defaultServeHostnameServicePort
  1123  
  1124  		svc1 := "up-down-1"
  1125  		svc2 := "up-down-2"
  1126  		svc3 := "up-down-3"
  1127  
  1128  		ginkgo.By("creating " + svc1 + " in namespace " + ns)
  1129  		podNames1, svc1IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc1), ns, numPods)
  1130  		framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc1, ns)
  1131  		ginkgo.By("creating " + svc2 + " in namespace " + ns)
  1132  		podNames2, svc2IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc2), ns, numPods)
  1133  		framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc2, ns)
  1134  
  1135  		ginkgo.By("verifying service " + svc1 + " is up")
  1136  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
  1137  
  1138  		ginkgo.By("verifying service " + svc2 + " is up")
  1139  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
  1140  
  1141  		// Stop service 1 and make sure it is gone.
  1142  		ginkgo.By("stopping service " + svc1)
  1143  		framework.ExpectNoError(StopServeHostnameService(ctx, f.ClientSet, ns, svc1))
  1144  
  1145  		ginkgo.By("verifying service " + svc1 + " is not up")
  1146  		framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svc1IP, servicePort))
  1147  		ginkgo.By("verifying service " + svc2 + " is still up")
  1148  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
  1149  
  1150  		// Start another service and verify both are up.
  1151  		ginkgo.By("creating service " + svc3 + " in namespace " + ns)
  1152  		podNames3, svc3IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc3), ns, numPods)
  1153  		framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc3, ns)
  1154  
  1155  		if svc2IP == svc3IP {
  1156  			framework.Failf("service IPs conflict: %v", svc2IP)
  1157  		}
  1158  
  1159  		ginkgo.By("verifying service " + svc2 + " is still up")
  1160  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
  1161  
  1162  		ginkgo.By("verifying service " + svc3 + " is up")
  1163  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames3, svc3IP, servicePort))
  1164  	})
  1165  
  1166  	ginkgo.It("should work after the service has been recreated", func(ctx context.Context) {
  1167  		serviceName := "service-deletion"
  1168  		ns := f.Namespace.Name
  1169  		numPods, servicePort := 1, defaultServeHostnameServicePort
  1170  
  1171  		ginkgo.By("creating the service " + serviceName + " in namespace " + ns)
  1172  		ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, serviceName)
  1173  		podNames, svcIP, _ := StartServeHostnameService(ctx, cs, getServeHostnameService(serviceName), ns, numPods)
  1174  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames, svcIP, servicePort))
  1175  
  1176  		ginkgo.By("deleting the service " + serviceName + " in namespace " + ns)
  1177  		err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
  1178  		framework.ExpectNoError(err)
  1179  
  1180  		ginkgo.By("Waiting for the service " + serviceName + " in namespace " + ns + " to disappear")
  1181  		if pollErr := wait.PollImmediate(framework.Poll, e2eservice.RespondingTimeout, func() (bool, error) {
  1182  			_, err := cs.CoreV1().Services(ns).Get(ctx, serviceName, metav1.GetOptions{})
  1183  			if err != nil {
  1184  				if apierrors.IsNotFound(err) {
  1185  					framework.Logf("Service %s/%s is gone.", ns, serviceName)
  1186  					return true, nil
  1187  				}
  1188  				return false, err
  1189  			}
  1190  			framework.Logf("Service %s/%s still exists", ns, serviceName)
  1191  			return false, nil
  1192  		}); pollErr != nil {
  1193  			framework.Failf("Failed to wait for service to disappear: %v", pollErr)
  1194  		}
  1195  
  1196  		ginkgo.By("recreating the service " + serviceName + " in namespace " + ns)
  1197  		svc, err := cs.CoreV1().Services(ns).Create(ctx, getServeHostnameService(serviceName), metav1.CreateOptions{})
  1198  		framework.ExpectNoError(err)
  1199  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames, svc.Spec.ClusterIP, servicePort))
  1200  	})
  1201  
  1202  	f.It("should work after restarting kube-proxy", f.WithDisruptive(), func(ctx context.Context) {
  1203  		kubeProxyLabelSet := map[string]string{clusterAddonLabelKey: kubeProxyLabelName}
  1204  		e2eskipper.SkipUnlessComponentRunsAsPodsAndClientCanDeleteThem(ctx, kubeProxyLabelName, cs, metav1.NamespaceSystem, kubeProxyLabelSet)
  1205  
  1206  		// TODO: use the ServiceTestJig here
  1207  		ns := f.Namespace.Name
  1208  		numPods, servicePort := 3, defaultServeHostnameServicePort
  1209  
  1210  		svc1 := "restart-proxy-1"
  1211  		svc2 := "restart-proxy-2"
  1212  
  1213  		ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, svc1)
  1214  		podNames1, svc1IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc1), ns, numPods)
  1215  		framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc1, ns)
  1216  
  1217  		ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, svc2)
  1218  		podNames2, svc2IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc2), ns, numPods)
  1219  		framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc2, ns)
  1220  
  1221  		if svc1IP == svc2IP {
  1222  			framework.Failf("VIPs conflict: %v", svc1IP)
  1223  		}
  1224  
  1225  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
  1226  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
  1227  
  1228  		if err := restartComponent(ctx, cs, kubeProxyLabelName, metav1.NamespaceSystem, kubeProxyLabelSet); err != nil {
  1229  			framework.Failf("error restarting kube-proxy: %v", err)
  1230  		}
  1231  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
  1232  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
  1233  	})
  1234  
  1235  	f.It("should work after restarting apiserver", f.WithDisruptive(), func(ctx context.Context) {
  1236  
  1237  		if !framework.ProviderIs("gke") {
  1238  			e2eskipper.SkipUnlessComponentRunsAsPodsAndClientCanDeleteThem(ctx, kubeAPIServerLabelName, cs, metav1.NamespaceSystem, map[string]string{clusterComponentKey: kubeAPIServerLabelName})
  1239  		}
  1240  
  1241  		// TODO: use the ServiceTestJig here
  1242  		ns := f.Namespace.Name
  1243  		numPods, servicePort := 3, defaultServeHostnameServicePort
  1244  
  1245  		svc1 := "restart-apiserver-1"
  1246  		svc2 := "restart-apiserver-2"
  1247  
  1248  		ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, svc1)
  1249  		podNames1, svc1IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc1), ns, numPods)
  1250  		framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc1, ns)
  1251  
  1252  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
  1253  
  1254  		// Restart apiserver
  1255  		ginkgo.By("Restarting apiserver")
  1256  		if err := restartApiserver(ctx, ns, cs); err != nil {
  1257  			framework.Failf("error restarting apiserver: %v", err)
  1258  		}
  1259  		ginkgo.By("Waiting for apiserver to come up by polling /healthz")
  1260  		if err := waitForAPIServerUp(ctx, cs); err != nil {
  1261  			framework.Failf("error while waiting for apiserver up: %v", err)
  1262  		}
  1263  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
  1264  
  1265  		// Create a new service and check if it's not reusing IP.
  1266  		ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, svc2)
  1267  		podNames2, svc2IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc2), ns, numPods)
  1268  		framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc2, ns)
  1269  
  1270  		if svc1IP == svc2IP {
  1271  			framework.Failf("VIPs conflict: %v", svc1IP)
  1272  		}
  1273  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
  1274  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
  1275  	})
  1276  
  1277  	/*
  1278  		Release: v1.16
  1279  		Testname: Service, NodePort Service
  1280  		Description: Create a TCP NodePort service, and test reachability from a client Pod.
  1281  		The client Pod MUST be able to access the NodePort service by service name and cluster
  1282  		IP on the service port, and on nodes' internal and external IPs on the NodePort.
  1283  	*/
  1284  	framework.ConformanceIt("should be able to create a functioning NodePort service", func(ctx context.Context) {
  1285  		serviceName := "nodeport-test"
  1286  		ns := f.Namespace.Name
  1287  
  1288  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  1289  
  1290  		ginkgo.By("creating service " + serviceName + " with type=NodePort in namespace " + ns)
  1291  		nodePortService, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  1292  			svc.Spec.Type = v1.ServiceTypeNodePort
  1293  			svc.Spec.Ports = []v1.ServicePort{
  1294  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(9376)},
  1295  			}
  1296  		})
  1297  		framework.ExpectNoError(err)
  1298  		err = jig.CreateServicePods(ctx, 2)
  1299  		framework.ExpectNoError(err)
  1300  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
  1301  		err = jig.CheckServiceReachability(ctx, nodePortService, execPod)
  1302  		framework.ExpectNoError(err)
  1303  	})
  1304  
  1305  	/*
  1306  		Create a ClusterIP service with an External IP that is not assigned to an interface.
  1307  		The IP ranges here are reserved for documentation according to
  1308  		[RFC 5737](https://tools.ietf.org/html/rfc5737) Section 3 and should not be used by any host.
  1309  	*/
  1310  	ginkgo.It("should be possible to connect to a service via ExternalIP when the external IP is not assigned to a node", func(ctx context.Context) {
  1311  		serviceName := "externalip-test"
  1312  		ns := f.Namespace.Name
  1313  		externalIP := "203.0.113.250"
  1314  		if framework.TestContext.ClusterIsIPv6() {
  1315  			externalIP = "2001:DB8::cb00:71fa"
  1316  		}
  1317  
  1318  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  1319  		jig.ExternalIPs = false
  1320  
  1321  		ginkgo.By("creating service " + serviceName + " with type=clusterIP in namespace " + ns)
  1322  		clusterIPService, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  1323  			svc.Spec.Type = v1.ServiceTypeClusterIP
  1324  			svc.Spec.ExternalIPs = []string{externalIP}
  1325  			svc.Spec.Ports = []v1.ServicePort{
  1326  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(9376)},
  1327  			}
  1328  		})
  1329  		if err != nil && strings.Contains(err.Error(), "Use of external IPs is denied by admission control") {
  1330  			e2eskipper.Skipf("Admission controller to deny services with external IPs is enabled - skip.")
  1331  		}
  1332  		framework.ExpectNoError(err)
  1333  		err = jig.CreateServicePods(ctx, 2)
  1334  		framework.ExpectNoError(err)
  1335  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
  1336  		err = jig.CheckServiceReachability(ctx, clusterIPService, execPod)
  1337  		framework.ExpectNoError(err)
  1338  	})
  1339  
  1340  	/*
  1341  		Testname: Service, update NodePort, same port different protocol
  1342  		Description: Create a service to accept TCP requests. By default, created service MUST be of type ClusterIP and an ClusterIP MUST be assigned to the service.
  1343  		When service type is updated to NodePort supporting TCP protocol, it MUST be reachable on nodeIP over allocated NodePort to serve TCP requests.
  1344  		When this NodePort service is updated to use two protocols i.e. TCP and UDP for same assigned service port 80, service update MUST be successful by allocating two NodePorts to the service and
  1345  		service MUST be able to serve both TCP and UDP requests over same service port 80.
  1346  	*/
  1347  	ginkgo.It("should be able to update service type to NodePort listening on same port number but different protocols", func(ctx context.Context) {
  1348  		serviceName := "nodeport-update-service"
  1349  		ns := f.Namespace.Name
  1350  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  1351  		jig.ExternalIPs = false
  1352  
  1353  		ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns)
  1354  		tcpService, err := jig.CreateTCPService(ctx, nil)
  1355  		framework.ExpectNoError(err)
  1356  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1357  			framework.Logf("Cleaning up the updating NodePorts test service")
  1358  			err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
  1359  			framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
  1360  		})
  1361  		framework.Logf("Service Port TCP: %v", tcpService.Spec.Ports[0].Port)
  1362  
  1363  		ginkgo.By("changing the TCP service to type=NodePort")
  1364  		nodePortService, err := jig.UpdateService(ctx, func(s *v1.Service) {
  1365  			s.Spec.Type = v1.ServiceTypeNodePort
  1366  			s.Spec.Ports = []v1.ServicePort{
  1367  				{
  1368  					Name:       "tcp-port",
  1369  					Port:       80,
  1370  					Protocol:   v1.ProtocolTCP,
  1371  					TargetPort: intstr.FromInt32(9376),
  1372  				},
  1373  			}
  1374  		})
  1375  		framework.ExpectNoError(err)
  1376  
  1377  		err = jig.CreateServicePods(ctx, 2)
  1378  		framework.ExpectNoError(err)
  1379  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
  1380  		err = jig.CheckServiceReachability(ctx, nodePortService, execPod)
  1381  		framework.ExpectNoError(err)
  1382  
  1383  		ginkgo.By("Updating NodePort service to listen TCP and UDP based requests over same Port")
  1384  		nodePortService, err = jig.UpdateService(ctx, func(s *v1.Service) {
  1385  			s.Spec.Type = v1.ServiceTypeNodePort
  1386  			s.Spec.Ports = []v1.ServicePort{
  1387  				{
  1388  					Name:       "tcp-port",
  1389  					Port:       80,
  1390  					Protocol:   v1.ProtocolTCP,
  1391  					TargetPort: intstr.FromInt32(9376),
  1392  				},
  1393  				{
  1394  					Name:       "udp-port",
  1395  					Port:       80,
  1396  					Protocol:   v1.ProtocolUDP,
  1397  					TargetPort: intstr.FromInt32(9376),
  1398  				},
  1399  			}
  1400  		})
  1401  		framework.ExpectNoError(err)
  1402  		err = jig.CheckServiceReachability(ctx, nodePortService, execPod)
  1403  		framework.ExpectNoError(err)
  1404  		nodePortCounts := len(nodePortService.Spec.Ports)
  1405  		gomega.Expect(nodePortCounts).To(gomega.Equal(2), "updated service should have two Ports but found %d Ports", nodePortCounts)
  1406  
  1407  		for _, port := range nodePortService.Spec.Ports {
  1408  			gomega.Expect(port.NodePort).ToNot(gomega.BeZero(), "NodePort service failed to allocate NodePort for Port %s", port.Name)
  1409  			framework.Logf("NodePort service allocates NodePort: %d for Port: %s over Protocol: %s", port.NodePort, port.Name, port.Protocol)
  1410  		}
  1411  	})
  1412  
  1413  	/*
  1414  		Release: v1.16
  1415  		Testname: Service, change type, ExternalName to ClusterIP
  1416  		Description: Create a service of type ExternalName, pointing to external DNS. ClusterIP MUST not be assigned to the service.
  1417  		Update the service from ExternalName to ClusterIP by removing ExternalName entry, assigning port 80 as service port and TCP as protocol.
  1418  		Service update MUST be successful by assigning ClusterIP to the service and it MUST be reachable over serviceName and ClusterIP on provided service port.
  1419  	*/
  1420  	framework.ConformanceIt("should be able to change the type from ExternalName to ClusterIP", func(ctx context.Context) {
  1421  		serviceName := "externalname-service"
  1422  		ns := f.Namespace.Name
  1423  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  1424  
  1425  		ginkgo.By("creating a service " + serviceName + " with the type=ExternalName in namespace " + ns)
  1426  		_, err := jig.CreateExternalNameService(ctx, nil)
  1427  		framework.ExpectNoError(err)
  1428  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1429  			framework.Logf("Cleaning up the ExternalName to ClusterIP test service")
  1430  			err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
  1431  			framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
  1432  		})
  1433  
  1434  		ginkgo.By("changing the ExternalName service to type=ClusterIP")
  1435  		clusterIPService, err := jig.UpdateService(ctx, func(s *v1.Service) {
  1436  			s.Spec.Type = v1.ServiceTypeClusterIP
  1437  			s.Spec.ExternalName = ""
  1438  			s.Spec.Ports = []v1.ServicePort{
  1439  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(9376)},
  1440  			}
  1441  		})
  1442  		framework.ExpectNoError(err)
  1443  
  1444  		err = jig.CreateServicePods(ctx, 2)
  1445  		framework.ExpectNoError(err)
  1446  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
  1447  		err = jig.CheckServiceReachability(ctx, clusterIPService, execPod)
  1448  		framework.ExpectNoError(err)
  1449  	})
  1450  
  1451  	/*
  1452  		Release: v1.16
  1453  		Testname: Service, change type, ExternalName to NodePort
  1454  		Description: Create a service of type ExternalName, pointing to external DNS. ClusterIP MUST not be assigned to the service.
  1455  		Update the service from ExternalName to NodePort, assigning port 80 as service port and, TCP as protocol.
  1456  		service update MUST be successful by exposing service on every node's IP on dynamically assigned NodePort and, ClusterIP MUST be assigned to route service requests.
  1457  		Service MUST be reachable over serviceName and the ClusterIP on servicePort. Service MUST also be reachable over node's IP on NodePort.
  1458  	*/
  1459  	framework.ConformanceIt("should be able to change the type from ExternalName to NodePort", func(ctx context.Context) {
  1460  		serviceName := "externalname-service"
  1461  		ns := f.Namespace.Name
  1462  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  1463  
  1464  		ginkgo.By("creating a service " + serviceName + " with the type=ExternalName in namespace " + ns)
  1465  		_, err := jig.CreateExternalNameService(ctx, nil)
  1466  		framework.ExpectNoError(err)
  1467  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1468  			framework.Logf("Cleaning up the ExternalName to NodePort test service")
  1469  			err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
  1470  			framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
  1471  		})
  1472  
  1473  		ginkgo.By("changing the ExternalName service to type=NodePort")
  1474  		nodePortService, err := jig.UpdateService(ctx, func(s *v1.Service) {
  1475  			s.Spec.Type = v1.ServiceTypeNodePort
  1476  			s.Spec.ExternalName = ""
  1477  			s.Spec.Ports = []v1.ServicePort{
  1478  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(9376)},
  1479  			}
  1480  		})
  1481  		framework.ExpectNoError(err)
  1482  		err = jig.CreateServicePods(ctx, 2)
  1483  		framework.ExpectNoError(err)
  1484  
  1485  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
  1486  		err = jig.CheckServiceReachability(ctx, nodePortService, execPod)
  1487  		framework.ExpectNoError(err)
  1488  	})
  1489  
  1490  	/*
  1491  		Release: v1.16
  1492  		Testname: Service, change type, ClusterIP to ExternalName
  1493  		Description: Create a service of type ClusterIP. Service creation MUST be successful by assigning ClusterIP to the service.
  1494  		Update service type from ClusterIP to ExternalName by setting CNAME entry as externalName. Service update MUST be successful and service MUST not has associated ClusterIP.
  1495  		Service MUST be able to resolve to IP address by returning A records ensuring service is pointing to provided externalName.
  1496  	*/
  1497  	framework.ConformanceIt("should be able to change the type from ClusterIP to ExternalName", func(ctx context.Context) {
  1498  		serviceName := "clusterip-service"
  1499  		ns := f.Namespace.Name
  1500  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  1501  
  1502  		ginkgo.By("creating a service " + serviceName + " with the type=ClusterIP in namespace " + ns)
  1503  		_, err := jig.CreateTCPService(ctx, nil)
  1504  		framework.ExpectNoError(err)
  1505  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1506  			framework.Logf("Cleaning up the ClusterIP to ExternalName test service")
  1507  			err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
  1508  			framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
  1509  		})
  1510  
  1511  		ginkgo.By("Creating active service to test reachability when its FQDN is referred as externalName for another service")
  1512  		externalServiceName := "externalsvc"
  1513  		externalServiceFQDN := createAndGetExternalServiceFQDN(ctx, cs, ns, externalServiceName)
  1514  		ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, externalServiceName)
  1515  
  1516  		ginkgo.By("changing the ClusterIP service to type=ExternalName")
  1517  		externalNameService, err := jig.UpdateService(ctx, func(s *v1.Service) {
  1518  			s.Spec.Type = v1.ServiceTypeExternalName
  1519  			s.Spec.ExternalName = externalServiceFQDN
  1520  		})
  1521  		framework.ExpectNoError(err)
  1522  		if externalNameService.Spec.ClusterIP != "" {
  1523  			framework.Failf("Spec.ClusterIP was not cleared")
  1524  		}
  1525  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
  1526  		err = jig.CheckServiceReachability(ctx, externalNameService, execPod)
  1527  		framework.ExpectNoError(err)
  1528  	})
  1529  
  1530  	/*
  1531  		Release: v1.16
  1532  		Testname: Service, change type, NodePort to ExternalName
  1533  		Description: Create a service of type NodePort. Service creation MUST be successful by exposing service on every node's IP on dynamically assigned NodePort and, ClusterIP MUST be assigned to route service requests.
  1534  		Update the service type from NodePort to ExternalName by setting CNAME entry as externalName. Service update MUST be successful and, MUST not has ClusterIP associated with the service and, allocated NodePort MUST be released.
  1535  		Service MUST be able to resolve to IP address by returning A records ensuring service is pointing to provided externalName.
  1536  	*/
  1537  	framework.ConformanceIt("should be able to change the type from NodePort to ExternalName", func(ctx context.Context) {
  1538  		serviceName := "nodeport-service"
  1539  		ns := f.Namespace.Name
  1540  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  1541  
  1542  		ginkgo.By("creating a service " + serviceName + " with the type=NodePort in namespace " + ns)
  1543  		_, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  1544  			svc.Spec.Type = v1.ServiceTypeNodePort
  1545  		})
  1546  		framework.ExpectNoError(err)
  1547  		ginkgo.DeferCleanup(func(ctx context.Context) {
  1548  			framework.Logf("Cleaning up the NodePort to ExternalName test service")
  1549  			err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
  1550  			framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
  1551  		})
  1552  
  1553  		ginkgo.By("Creating active service to test reachability when its FQDN is referred as externalName for another service")
  1554  		externalServiceName := "externalsvc"
  1555  		externalServiceFQDN := createAndGetExternalServiceFQDN(ctx, cs, ns, externalServiceName)
  1556  		ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, externalServiceName)
  1557  
  1558  		ginkgo.By("changing the NodePort service to type=ExternalName")
  1559  		externalNameService, err := jig.UpdateService(ctx, func(s *v1.Service) {
  1560  			s.Spec.Type = v1.ServiceTypeExternalName
  1561  			s.Spec.ExternalName = externalServiceFQDN
  1562  		})
  1563  		framework.ExpectNoError(err)
  1564  		if externalNameService.Spec.ClusterIP != "" {
  1565  			framework.Failf("Spec.ClusterIP was not cleared")
  1566  		}
  1567  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
  1568  		err = jig.CheckServiceReachability(ctx, externalNameService, execPod)
  1569  		framework.ExpectNoError(err)
  1570  	})
  1571  
  1572  	ginkgo.It("should prevent NodePort collisions", func(ctx context.Context) {
  1573  		// TODO: use the ServiceTestJig here
  1574  		baseName := "nodeport-collision-"
  1575  		serviceName1 := baseName + "1"
  1576  		serviceName2 := baseName + "2"
  1577  		ns := f.Namespace.Name
  1578  
  1579  		t := NewServerTest(cs, ns, serviceName1)
  1580  		defer func() {
  1581  			defer ginkgo.GinkgoRecover()
  1582  			errs := t.Cleanup()
  1583  			if len(errs) != 0 {
  1584  				framework.Failf("errors in cleanup: %v", errs)
  1585  			}
  1586  		}()
  1587  
  1588  		ginkgo.By("creating service " + serviceName1 + " with type NodePort in namespace " + ns)
  1589  		service := t.BuildServiceSpec()
  1590  		service.Spec.Type = v1.ServiceTypeNodePort
  1591  		result, err := t.CreateService(service)
  1592  		framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName1, ns)
  1593  
  1594  		if result.Spec.Type != v1.ServiceTypeNodePort {
  1595  			framework.Failf("got unexpected Spec.Type for new service: %v", result)
  1596  		}
  1597  		if len(result.Spec.Ports) != 1 {
  1598  			framework.Failf("got unexpected len(Spec.Ports) for new service: %v", result)
  1599  		}
  1600  		port := result.Spec.Ports[0]
  1601  		if port.NodePort == 0 {
  1602  			framework.Failf("got unexpected Spec.Ports[0].NodePort for new service: %v", result)
  1603  		}
  1604  
  1605  		ginkgo.By("creating service " + serviceName2 + " with conflicting NodePort")
  1606  		service2 := t.BuildServiceSpec()
  1607  		service2.Name = serviceName2
  1608  		service2.Spec.Type = v1.ServiceTypeNodePort
  1609  		service2.Spec.Ports[0].NodePort = port.NodePort
  1610  		result2, err := t.CreateService(service2)
  1611  		if err == nil {
  1612  			framework.Failf("Created service with conflicting NodePort: %v", result2)
  1613  		}
  1614  		expectedErr := fmt.Sprintf("%d.*port is already allocated", port.NodePort)
  1615  		gomega.Expect(fmt.Sprintf("%v", err)).To(gomega.MatchRegexp(expectedErr))
  1616  
  1617  		ginkgo.By("deleting service " + serviceName1 + " to release NodePort")
  1618  		err = t.DeleteService(serviceName1)
  1619  		framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName1, ns)
  1620  
  1621  		ginkgo.By("creating service " + serviceName2 + " with no-longer-conflicting NodePort")
  1622  		_, err = t.CreateService(service2)
  1623  		framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName1, ns)
  1624  	})
  1625  
  1626  	ginkgo.It("should check NodePort out-of-range", func(ctx context.Context) {
  1627  		// TODO: use the ServiceTestJig here
  1628  		serviceName := "nodeport-range-test"
  1629  		ns := f.Namespace.Name
  1630  
  1631  		t := NewServerTest(cs, ns, serviceName)
  1632  		defer func() {
  1633  			defer ginkgo.GinkgoRecover()
  1634  			errs := t.Cleanup()
  1635  			if len(errs) != 0 {
  1636  				framework.Failf("errors in cleanup: %v", errs)
  1637  			}
  1638  		}()
  1639  
  1640  		service := t.BuildServiceSpec()
  1641  		service.Spec.Type = v1.ServiceTypeNodePort
  1642  
  1643  		ginkgo.By("creating service " + serviceName + " with type NodePort in namespace " + ns)
  1644  		service, err := t.CreateService(service)
  1645  		framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
  1646  
  1647  		if service.Spec.Type != v1.ServiceTypeNodePort {
  1648  			framework.Failf("got unexpected Spec.Type for new service: %v", service)
  1649  		}
  1650  		if len(service.Spec.Ports) != 1 {
  1651  			framework.Failf("got unexpected len(Spec.Ports) for new service: %v", service)
  1652  		}
  1653  		port := service.Spec.Ports[0]
  1654  		if port.NodePort == 0 {
  1655  			framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", service)
  1656  		}
  1657  		if !e2eservice.NodePortRange.Contains(int(port.NodePort)) {
  1658  			framework.Failf("got unexpected (out-of-range) port for new service: %v", service)
  1659  		}
  1660  
  1661  		outOfRangeNodePort := 0
  1662  		for {
  1663  			outOfRangeNodePort = 1 + rand.Intn(65535)
  1664  			if !e2eservice.NodePortRange.Contains(outOfRangeNodePort) {
  1665  				break
  1666  			}
  1667  		}
  1668  		ginkgo.By(fmt.Sprintf("changing service "+serviceName+" to out-of-range NodePort %d", outOfRangeNodePort))
  1669  		result, err := e2eservice.UpdateService(ctx, cs, ns, serviceName, func(s *v1.Service) {
  1670  			s.Spec.Ports[0].NodePort = int32(outOfRangeNodePort)
  1671  		})
  1672  		if err == nil {
  1673  			framework.Failf("failed to prevent update of service with out-of-range NodePort: %v", result)
  1674  		}
  1675  		expectedErr := fmt.Sprintf("%d.*port is not in the valid range", outOfRangeNodePort)
  1676  		gomega.Expect(fmt.Sprintf("%v", err)).To(gomega.MatchRegexp(expectedErr))
  1677  
  1678  		ginkgo.By("deleting original service " + serviceName)
  1679  		err = t.DeleteService(serviceName)
  1680  		framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
  1681  
  1682  		ginkgo.By(fmt.Sprintf("creating service "+serviceName+" with out-of-range NodePort %d", outOfRangeNodePort))
  1683  		service = t.BuildServiceSpec()
  1684  		service.Spec.Type = v1.ServiceTypeNodePort
  1685  		service.Spec.Ports[0].NodePort = int32(outOfRangeNodePort)
  1686  		service, err = t.CreateService(service)
  1687  		if err == nil {
  1688  			framework.Failf("failed to prevent create of service with out-of-range NodePort (%d): %v", outOfRangeNodePort, service)
  1689  		}
  1690  		gomega.Expect(fmt.Sprintf("%v", err)).To(gomega.MatchRegexp(expectedErr))
  1691  	})
  1692  
  1693  	ginkgo.It("should release NodePorts on delete", func(ctx context.Context) {
  1694  		// TODO: use the ServiceTestJig here
  1695  		serviceName := "nodeport-reuse"
  1696  		ns := f.Namespace.Name
  1697  
  1698  		t := NewServerTest(cs, ns, serviceName)
  1699  		defer func() {
  1700  			defer ginkgo.GinkgoRecover()
  1701  			errs := t.Cleanup()
  1702  			if len(errs) != 0 {
  1703  				framework.Failf("errors in cleanup: %v", errs)
  1704  			}
  1705  		}()
  1706  
  1707  		service := t.BuildServiceSpec()
  1708  		service.Spec.Type = v1.ServiceTypeNodePort
  1709  
  1710  		ginkgo.By("creating service " + serviceName + " with type NodePort in namespace " + ns)
  1711  		service, err := t.CreateService(service)
  1712  		framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
  1713  
  1714  		if service.Spec.Type != v1.ServiceTypeNodePort {
  1715  			framework.Failf("got unexpected Spec.Type for new service: %v", service)
  1716  		}
  1717  		if len(service.Spec.Ports) != 1 {
  1718  			framework.Failf("got unexpected len(Spec.Ports) for new service: %v", service)
  1719  		}
  1720  		port := service.Spec.Ports[0]
  1721  		if port.NodePort == 0 {
  1722  			framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", service)
  1723  		}
  1724  		if !e2eservice.NodePortRange.Contains(int(port.NodePort)) {
  1725  			framework.Failf("got unexpected (out-of-range) port for new service: %v", service)
  1726  		}
  1727  		nodePort := port.NodePort
  1728  
  1729  		ginkgo.By("deleting original service " + serviceName)
  1730  		err = t.DeleteService(serviceName)
  1731  		framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
  1732  
  1733  		hostExec := launchHostExecPod(ctx, f.ClientSet, f.Namespace.Name, "hostexec")
  1734  		cmd := fmt.Sprintf(`! ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN`, nodePort)
  1735  		var stdout string
  1736  		if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) {
  1737  			var err error
  1738  			stdout, err = e2eoutput.RunHostCmd(hostExec.Namespace, hostExec.Name, cmd)
  1739  			if err != nil {
  1740  				framework.Logf("expected node port (%d) to not be in use, stdout: %v", nodePort, stdout)
  1741  				return false, nil
  1742  			}
  1743  			return true, nil
  1744  		}); pollErr != nil {
  1745  			framework.Failf("expected node port (%d) to not be in use in %v, stdout: %v", nodePort, e2eservice.KubeProxyLagTimeout, stdout)
  1746  		}
  1747  
  1748  		ginkgo.By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", nodePort))
  1749  		service = t.BuildServiceSpec()
  1750  		service.Spec.Type = v1.ServiceTypeNodePort
  1751  		service.Spec.Ports[0].NodePort = nodePort
  1752  		_, err = t.CreateService(service)
  1753  		framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
  1754  	})
  1755  
  1756  	ginkgo.It("should create endpoints for unready pods", func(ctx context.Context) {
  1757  		serviceName := "tolerate-unready"
  1758  		ns := f.Namespace.Name
  1759  
  1760  		t := NewServerTest(cs, ns, serviceName)
  1761  		defer func() {
  1762  			defer ginkgo.GinkgoRecover()
  1763  			errs := t.Cleanup()
  1764  			if len(errs) != 0 {
  1765  				framework.Failf("errors in cleanup: %v", errs)
  1766  			}
  1767  		}()
  1768  
  1769  		t.Name = "slow-terminating-unready-pod"
  1770  		t.Image = imageutils.GetE2EImage(imageutils.Agnhost)
  1771  		port := int32(80)
  1772  		terminateSeconds := int64(100)
  1773  
  1774  		service := &v1.Service{
  1775  			ObjectMeta: metav1.ObjectMeta{
  1776  				Name:      t.ServiceName,
  1777  				Namespace: t.Namespace,
  1778  			},
  1779  			Spec: v1.ServiceSpec{
  1780  				Selector: t.Labels,
  1781  				Ports: []v1.ServicePort{{
  1782  					Name:       "http",
  1783  					Port:       port,
  1784  					TargetPort: intstr.FromInt32(port),
  1785  				}},
  1786  				PublishNotReadyAddresses: true,
  1787  			},
  1788  		}
  1789  		rcSpec := e2erc.ByNameContainer(t.Name, 1, t.Labels, v1.Container{
  1790  			Args:  []string{"netexec", fmt.Sprintf("--http-port=%d", port)},
  1791  			Name:  t.Name,
  1792  			Image: t.Image,
  1793  			Ports: []v1.ContainerPort{{ContainerPort: int32(port), Protocol: v1.ProtocolTCP}},
  1794  			ReadinessProbe: &v1.Probe{
  1795  				ProbeHandler: v1.ProbeHandler{
  1796  					Exec: &v1.ExecAction{
  1797  						Command: []string{"/bin/false"},
  1798  					},
  1799  				},
  1800  			},
  1801  			Lifecycle: &v1.Lifecycle{
  1802  				PreStop: &v1.LifecycleHandler{
  1803  					Exec: &v1.ExecAction{
  1804  						Command: []string{"/bin/sleep", fmt.Sprintf("%d", terminateSeconds)},
  1805  					},
  1806  				},
  1807  			},
  1808  		}, nil)
  1809  		rcSpec.Spec.Template.Spec.TerminationGracePeriodSeconds = &terminateSeconds
  1810  
  1811  		ginkgo.By(fmt.Sprintf("creating RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
  1812  		_, err := t.CreateRC(rcSpec)
  1813  		framework.ExpectNoError(err)
  1814  
  1815  		ginkgo.By(fmt.Sprintf("creating Service %v with selectors %v", service.Name, service.Spec.Selector))
  1816  		_, err = t.CreateService(service)
  1817  		framework.ExpectNoError(err)
  1818  
  1819  		ginkgo.By("Verifying pods for RC " + t.Name)
  1820  		framework.ExpectNoError(e2epod.VerifyPods(ctx, t.Client, t.Namespace, t.Name, false, 1))
  1821  
  1822  		svcName := fmt.Sprintf("%v.%v.svc.%v", serviceName, f.Namespace.Name, framework.TestContext.ClusterDNSDomain)
  1823  		ginkgo.By("Waiting for endpoints of Service with DNS name " + svcName)
  1824  
  1825  		execPod := e2epod.CreateExecPodOrFail(ctx, f.ClientSet, f.Namespace.Name, "execpod-", nil)
  1826  		execPodName := execPod.Name
  1827  		cmd := fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/", svcName, port)
  1828  		var stdout string
  1829  		if pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, e2eservice.KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) {
  1830  			var err error
  1831  			stdout, err = e2eoutput.RunHostCmd(f.Namespace.Name, execPodName, cmd)
  1832  			if err != nil {
  1833  				framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.Name, stdout, err)
  1834  				return false, nil
  1835  			}
  1836  			return true, nil
  1837  		}); pollErr != nil {
  1838  			framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.Name, e2eservice.KubeProxyLagTimeout, stdout)
  1839  		}
  1840  
  1841  		ginkgo.By("Scaling down replication controller to zero")
  1842  		e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, t.Namespace, rcSpec.Name, 0, false)
  1843  
  1844  		ginkgo.By("Update service to not tolerate unready services")
  1845  		_, err = e2eservice.UpdateService(ctx, f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) {
  1846  			s.Spec.PublishNotReadyAddresses = false
  1847  		})
  1848  		framework.ExpectNoError(err)
  1849  
  1850  		ginkgo.By("Check if pod is unreachable")
  1851  		cmd = fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/; test \"$?\" -ne \"0\"", svcName, port)
  1852  		if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) {
  1853  			var err error
  1854  			stdout, err = e2eoutput.RunHostCmd(f.Namespace.Name, execPodName, cmd)
  1855  			if err != nil {
  1856  				framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.Name, stdout, err)
  1857  				return false, nil
  1858  			}
  1859  			return true, nil
  1860  		}); pollErr != nil {
  1861  			framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.Name, e2eservice.KubeProxyLagTimeout, stdout)
  1862  		}
  1863  
  1864  		ginkgo.By("Update service to tolerate unready services again")
  1865  		_, err = e2eservice.UpdateService(ctx, f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) {
  1866  			s.Spec.PublishNotReadyAddresses = true
  1867  		})
  1868  		framework.ExpectNoError(err)
  1869  
  1870  		ginkgo.By("Check if terminating pod is available through service")
  1871  		cmd = fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/", svcName, port)
  1872  		if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) {
  1873  			var err error
  1874  			stdout, err = e2eoutput.RunHostCmd(f.Namespace.Name, execPodName, cmd)
  1875  			if err != nil {
  1876  				framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.Name, stdout, err)
  1877  				return false, nil
  1878  			}
  1879  			return true, nil
  1880  		}); pollErr != nil {
  1881  			framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.Name, e2eservice.KubeProxyLagTimeout, stdout)
  1882  		}
  1883  
  1884  		ginkgo.By("Remove pods immediately")
  1885  		label := labels.SelectorFromSet(labels.Set(t.Labels))
  1886  		options := metav1.ListOptions{LabelSelector: label.String()}
  1887  		podClient := t.Client.CoreV1().Pods(f.Namespace.Name)
  1888  		pods, err := podClient.List(ctx, options)
  1889  		if err != nil {
  1890  			framework.Logf("warning: error retrieving pods: %s", err)
  1891  		} else {
  1892  			for _, pod := range pods.Items {
  1893  				var gracePeriodSeconds int64 = 0
  1894  				err := podClient.Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
  1895  				if err != nil {
  1896  					framework.Logf("warning: error force deleting pod '%s': %s", pod.Name, err)
  1897  				}
  1898  			}
  1899  		}
  1900  	})
  1901  
  1902  	ginkgo.It("should be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is true", func(ctx context.Context) {
  1903  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  1904  		framework.ExpectNoError(err)
  1905  		nodeCounts := len(nodes.Items)
  1906  		if nodeCounts < 2 {
  1907  			e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
  1908  		}
  1909  		node0 := nodes.Items[0]
  1910  		node1 := nodes.Items[1]
  1911  
  1912  		serviceName := "svc-tolerate-unready"
  1913  		ns := f.Namespace.Name
  1914  		servicePort := 80
  1915  
  1916  		ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns)
  1917  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  1918  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  1919  			svc.Spec.Ports = []v1.ServicePort{
  1920  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
  1921  			}
  1922  			svc.Spec.Type = v1.ServiceTypeNodePort
  1923  			svc.Spec.PublishNotReadyAddresses = true
  1924  		})
  1925  		framework.ExpectNoError(err, "failed to create Service")
  1926  
  1927  		ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
  1928  		gracePeriod := int64(300)
  1929  		webserverPod0 := &v1.Pod{
  1930  			ObjectMeta: metav1.ObjectMeta{
  1931  				Name: "webserver-pod",
  1932  			},
  1933  			Spec: v1.PodSpec{
  1934  				Containers: []v1.Container{
  1935  					{
  1936  						Name:  "agnhost",
  1937  						Image: imageutils.GetE2EImage(imageutils.Agnhost),
  1938  						Args:  []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)},
  1939  						Ports: []v1.ContainerPort{
  1940  							{
  1941  								ContainerPort: 80,
  1942  							},
  1943  						},
  1944  						ReadinessProbe: &v1.Probe{
  1945  							ProbeHandler: v1.ProbeHandler{
  1946  								HTTPGet: &v1.HTTPGetAction{
  1947  									Path: "/readyz",
  1948  									Port: intstr.IntOrString{
  1949  										IntVal: int32(80),
  1950  									},
  1951  									Scheme: v1.URISchemeHTTP,
  1952  								},
  1953  							},
  1954  						},
  1955  						LivenessProbe: &v1.Probe{
  1956  							ProbeHandler: v1.ProbeHandler{
  1957  								HTTPGet: &v1.HTTPGetAction{
  1958  									Path: "/healthz",
  1959  									Port: intstr.IntOrString{
  1960  										IntVal: int32(80),
  1961  									},
  1962  									Scheme: v1.URISchemeHTTP,
  1963  								},
  1964  							},
  1965  						},
  1966  					},
  1967  				},
  1968  			},
  1969  		}
  1970  		webserverPod0.Labels = jig.Labels
  1971  		webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod)
  1972  		e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  1973  
  1974  		_, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
  1975  		framework.ExpectNoError(err, "failed to create pod")
  1976  		err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout)
  1977  		if err != nil {
  1978  			framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err)
  1979  		}
  1980  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
  1981  
  1982  		ginkgo.By("Creating 1 pause pods that will try to connect to the webservers")
  1983  		pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
  1984  		e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
  1985  
  1986  		pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
  1987  		framework.ExpectNoError(err, "failed to create pod")
  1988  		err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout)
  1989  		if err != nil {
  1990  			framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err)
  1991  		}
  1992  
  1993  		// webserver should continue to serve traffic through the Service after delete since:
  1994  		//  - it has a 100s termination grace period
  1995  		//  - it is unready but PublishNotReadyAddresses is true
  1996  		err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
  1997  		framework.ExpectNoError(err)
  1998  
  1999  		// Wait until the pod becomes unready
  2000  		err = e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
  2001  			return !podutil.IsPodReady(pod), nil
  2002  		})
  2003  		if err != nil {
  2004  			framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err)
  2005  		}
  2006  		// assert 5 times that the pause pod can connect to the Service
  2007  		nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
  2008  		nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP)
  2009  		clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
  2010  		nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
  2011  		nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
  2012  		// connect 3 times every 5 seconds to the Service with the unready and terminating endpoint
  2013  		for i := 0; i < 5; i++ {
  2014  			execHostnameTest(*pausePod1, clusterIPAddress, webserverPod0.Name)
  2015  			execHostnameTest(*pausePod1, nodePortAddress0, webserverPod0.Name)
  2016  			execHostnameTest(*pausePod1, nodePortAddress1, webserverPod0.Name)
  2017  			time.Sleep(5 * time.Second)
  2018  		}
  2019  	})
  2020  
  2021  	ginkgo.It("should not be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is false", func(ctx context.Context) {
  2022  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  2023  		framework.ExpectNoError(err)
  2024  		nodeCounts := len(nodes.Items)
  2025  		if nodeCounts < 2 {
  2026  			e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
  2027  		}
  2028  		node0 := nodes.Items[0]
  2029  		node1 := nodes.Items[1]
  2030  
  2031  		serviceName := "svc-not-tolerate-unready"
  2032  		ns := f.Namespace.Name
  2033  		servicePort := 80
  2034  
  2035  		ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns)
  2036  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  2037  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  2038  			svc.Spec.Ports = []v1.ServicePort{
  2039  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
  2040  			}
  2041  			svc.Spec.Type = v1.ServiceTypeNodePort
  2042  			svc.Spec.PublishNotReadyAddresses = false
  2043  		})
  2044  		framework.ExpectNoError(err, "failed to create Service")
  2045  
  2046  		ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
  2047  		gracePeriod := int64(300)
  2048  		webserverPod0 := &v1.Pod{
  2049  			ObjectMeta: metav1.ObjectMeta{
  2050  				Name: "webserver-pod",
  2051  			},
  2052  			Spec: v1.PodSpec{
  2053  				Containers: []v1.Container{
  2054  					{
  2055  						Name:  "agnhost",
  2056  						Image: imageutils.GetE2EImage(imageutils.Agnhost),
  2057  						Args:  []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)},
  2058  						Ports: []v1.ContainerPort{
  2059  							{
  2060  								ContainerPort: 80,
  2061  							},
  2062  						},
  2063  						ReadinessProbe: &v1.Probe{
  2064  							ProbeHandler: v1.ProbeHandler{
  2065  								HTTPGet: &v1.HTTPGetAction{
  2066  									Path: "/readyz",
  2067  									Port: intstr.IntOrString{
  2068  										IntVal: int32(80),
  2069  									},
  2070  									Scheme: v1.URISchemeHTTP,
  2071  								},
  2072  							},
  2073  						},
  2074  						LivenessProbe: &v1.Probe{
  2075  							ProbeHandler: v1.ProbeHandler{
  2076  								HTTPGet: &v1.HTTPGetAction{
  2077  									Path: "/healthz",
  2078  									Port: intstr.IntOrString{
  2079  										IntVal: int32(80),
  2080  									},
  2081  									Scheme: v1.URISchemeHTTP,
  2082  								},
  2083  							},
  2084  						},
  2085  					},
  2086  				},
  2087  			},
  2088  		}
  2089  		webserverPod0.Labels = jig.Labels
  2090  		webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod)
  2091  		e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2092  
  2093  		_, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
  2094  		framework.ExpectNoError(err, "failed to create pod")
  2095  		err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout)
  2096  		if err != nil {
  2097  			framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err)
  2098  		}
  2099  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
  2100  
  2101  		ginkgo.By("Creating 1 pause pods that will try to connect to the webservers")
  2102  		pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
  2103  		e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
  2104  
  2105  		pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
  2106  		framework.ExpectNoError(err, "failed to create pod")
  2107  		err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout)
  2108  		if err != nil {
  2109  			framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err)
  2110  		}
  2111  
  2112  		// webserver should stop to serve traffic through the Service after delete since:
  2113  		//  - it has a 100s termination grace period
  2114  		//  - it is unready but PublishNotReadyAddresses is false
  2115  		err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
  2116  		framework.ExpectNoError(err)
  2117  
  2118  		// Wait until the pod becomes unready
  2119  		err = e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
  2120  			return !podutil.IsPodReady(pod), nil
  2121  		})
  2122  		if err != nil {
  2123  			framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err)
  2124  		}
  2125  
  2126  		nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
  2127  		nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP)
  2128  		nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
  2129  		nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
  2130  
  2131  		// Wait until the change has been propagated to both nodes.
  2132  		cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress0)
  2133  		if pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, true, func(_ context.Context) (bool, error) {
  2134  			_, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
  2135  			if err != nil {
  2136  				return true, nil
  2137  			}
  2138  			return false, nil
  2139  		}); pollErr != nil {
  2140  			framework.ExpectNoError(pollErr, "pod on node0 still serves traffic")
  2141  		}
  2142  		cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1)
  2143  		if pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, true, func(_ context.Context) (bool, error) {
  2144  			_, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
  2145  			if err != nil {
  2146  				return true, nil
  2147  			}
  2148  			return false, nil
  2149  		}); pollErr != nil {
  2150  			framework.ExpectNoError(pollErr, "pod on node1 still serves traffic")
  2151  		}
  2152  
  2153  		clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
  2154  		// connect 3 times every 5 seconds to the Service and expect a failure
  2155  		for i := 0; i < 5; i++ {
  2156  			cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, clusterIPAddress)
  2157  			_, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
  2158  			gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
  2159  
  2160  			cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress0)
  2161  			_, err = e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
  2162  			gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to NodePort address")
  2163  
  2164  			cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1)
  2165  			_, err = e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
  2166  			gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to NodePort address")
  2167  
  2168  			time.Sleep(5 * time.Second)
  2169  		}
  2170  	})
  2171  
  2172  	/*
  2173  		Release: v1.19
  2174  		Testname: Service, ClusterIP type, session affinity to ClientIP
  2175  		Description: Create a service of type "ClusterIP". Service's sessionAffinity is set to "ClientIP". Service creation MUST be successful by assigning "ClusterIP" to the service.
  2176  		Create a Replication Controller to ensure that 3 pods are running and are targeted by the service to serve hostname of the pod when requests are sent to the service.
  2177  		Create another pod to make requests to the service. Service MUST serve the hostname from the same pod of the replica for all consecutive requests.
  2178  		Service MUST be reachable over serviceName and the ClusterIP on servicePort.
  2179  		[LinuxOnly]: Windows does not support session affinity.
  2180  	*/
  2181  	framework.ConformanceIt("should have session affinity work for service with type clusterIP [LinuxOnly]", func(ctx context.Context) {
  2182  		svc := getServeHostnameService("affinity-clusterip")
  2183  		svc.Spec.Type = v1.ServiceTypeClusterIP
  2184  		execAffinityTestForNonLBService(ctx, f, cs, svc)
  2185  	})
  2186  
  2187  	ginkgo.It("should have session affinity timeout work for service with type clusterIP [LinuxOnly]", func(ctx context.Context) {
  2188  		svc := getServeHostnameService("affinity-clusterip-timeout")
  2189  		svc.Spec.Type = v1.ServiceTypeClusterIP
  2190  		execAffinityTestForSessionAffinityTimeout(ctx, f, cs, svc)
  2191  	})
  2192  
  2193  	/*
  2194  		Release: v1.19
  2195  		Testname: Service, ClusterIP type, session affinity to None
  2196  		Description: Create a service of type "ClusterIP". Service's sessionAffinity is set to "ClientIP". Service creation MUST be successful by assigning "ClusterIP" to the service.
  2197  		Create a Replication Controller to ensure that 3 pods are running and are targeted by the service to serve hostname of the pod when requests are sent to the service.
  2198  		Create another pod to make requests to the service. Update the service's sessionAffinity to "None". Service update MUST be successful. When a requests are made to the service, it MUST be able serve the hostname from any pod of the replica.
  2199  		When service's sessionAffinily is updated back to "ClientIP", service MUST serve the hostname from the same pod of the replica for all consecutive requests.
  2200  		Service MUST be reachable over serviceName and the ClusterIP on servicePort.
  2201  		[LinuxOnly]: Windows does not support session affinity.
  2202  	*/
  2203  	framework.ConformanceIt("should be able to switch session affinity for service with type clusterIP [LinuxOnly]", func(ctx context.Context) {
  2204  		svc := getServeHostnameService("affinity-clusterip-transition")
  2205  		svc.Spec.Type = v1.ServiceTypeClusterIP
  2206  		execAffinityTestForNonLBServiceWithTransition(ctx, f, cs, svc)
  2207  	})
  2208  
  2209  	/*
  2210  		Release: v1.19
  2211  		Testname: Service, NodePort type, session affinity to ClientIP
  2212  		Description: Create a service of type "NodePort" and provide service port and protocol. Service's sessionAffinity is set to "ClientIP". Service creation MUST be successful by assigning a "ClusterIP" to service and allocating NodePort on all nodes.
  2213  		Create a Replication Controller to ensure that 3 pods are running and are targeted by the service to serve hostname of the pod when a requests are sent to the service.
  2214  		Create another pod to make requests to the service on node's IP and NodePort. Service MUST serve the hostname from the same pod of the replica for all consecutive requests.
  2215  		Service MUST be reachable over serviceName and the ClusterIP on servicePort. Service MUST also be reachable over node's IP on NodePort.
  2216  		[LinuxOnly]: Windows does not support session affinity.
  2217  	*/
  2218  	framework.ConformanceIt("should have session affinity work for NodePort service [LinuxOnly]", func(ctx context.Context) {
  2219  		svc := getServeHostnameService("affinity-nodeport")
  2220  		svc.Spec.Type = v1.ServiceTypeNodePort
  2221  		execAffinityTestForNonLBService(ctx, f, cs, svc)
  2222  	})
  2223  
  2224  	ginkgo.It("should have session affinity timeout work for NodePort service [LinuxOnly]", func(ctx context.Context) {
  2225  		svc := getServeHostnameService("affinity-nodeport-timeout")
  2226  		svc.Spec.Type = v1.ServiceTypeNodePort
  2227  		execAffinityTestForSessionAffinityTimeout(ctx, f, cs, svc)
  2228  	})
  2229  
  2230  	/*
  2231  		Release: v1.19
  2232  		Testname: Service, NodePort type, session affinity to None
  2233  		Description: Create a service of type "NodePort" and provide service port and protocol. Service's sessionAffinity is set to "ClientIP". Service creation MUST be successful by assigning a "ClusterIP" to the service and allocating NodePort on all the nodes.
  2234  		Create a Replication Controller to ensure that 3 pods are running and are targeted by the service to serve hostname of the pod when requests are sent to the service.
  2235  		Create another pod to make requests to the service. Update the service's sessionAffinity to "None". Service update MUST be successful. When a requests are made to the service on node's IP and NodePort, service MUST be able serve the hostname from any pod of the replica.
  2236  		When service's sessionAffinily is updated back to "ClientIP", service MUST serve the hostname from the same pod of the replica for all consecutive requests.
  2237  		Service MUST be reachable over serviceName and the ClusterIP on servicePort. Service MUST also be reachable over node's IP on NodePort.
  2238  		[LinuxOnly]: Windows does not support session affinity.
  2239  	*/
  2240  	framework.ConformanceIt("should be able to switch session affinity for NodePort service [LinuxOnly]", func(ctx context.Context) {
  2241  		svc := getServeHostnameService("affinity-nodeport-transition")
  2242  		svc.Spec.Type = v1.ServiceTypeNodePort
  2243  		execAffinityTestForNonLBServiceWithTransition(ctx, f, cs, svc)
  2244  	})
  2245  
  2246  	ginkgo.It("should implement service.kubernetes.io/service-proxy-name", func(ctx context.Context) {
  2247  		ns := f.Namespace.Name
  2248  		numPods, servicePort := 3, defaultServeHostnameServicePort
  2249  		serviceProxyNameLabels := map[string]string{"service.kubernetes.io/service-proxy-name": "foo-bar"}
  2250  
  2251  		// We will create 2 services to test creating services in both states and also dynamic updates
  2252  		// svcDisabled: Created with the label, will always be disabled. We create this early and
  2253  		//              test again late to make sure it never becomes available.
  2254  		// svcToggled: Created without the label then the label is toggled verifying reachability at each step.
  2255  
  2256  		ginkgo.By("creating service-disabled in namespace " + ns)
  2257  		svcDisabled := getServeHostnameService("service-proxy-disabled")
  2258  		svcDisabled.ObjectMeta.Labels = serviceProxyNameLabels
  2259  		_, svcDisabledIP, err := StartServeHostnameService(ctx, cs, svcDisabled, ns, numPods)
  2260  		framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svcDisabledIP, ns)
  2261  
  2262  		ginkgo.By("creating service in namespace " + ns)
  2263  		svcToggled := getServeHostnameService("service-proxy-toggled")
  2264  		podToggledNames, svcToggledIP, err := StartServeHostnameService(ctx, cs, svcToggled, ns, numPods)
  2265  		framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svcToggledIP, ns)
  2266  
  2267  		jig := e2eservice.NewTestJig(cs, ns, svcToggled.ObjectMeta.Name)
  2268  
  2269  		ginkgo.By("verifying service is up")
  2270  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podToggledNames, svcToggledIP, servicePort))
  2271  
  2272  		ginkgo.By("verifying service-disabled is not up")
  2273  		framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcDisabledIP, servicePort))
  2274  
  2275  		ginkgo.By("adding service-proxy-name label")
  2276  		_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
  2277  			svc.ObjectMeta.Labels = serviceProxyNameLabels
  2278  		})
  2279  		framework.ExpectNoError(err)
  2280  
  2281  		ginkgo.By("verifying service is not up")
  2282  		framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcToggledIP, servicePort))
  2283  
  2284  		ginkgo.By("removing service-proxy-name annotation")
  2285  		_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
  2286  			svc.ObjectMeta.Labels = nil
  2287  		})
  2288  		framework.ExpectNoError(err)
  2289  
  2290  		ginkgo.By("verifying service is up")
  2291  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podToggledNames, svcToggledIP, servicePort))
  2292  
  2293  		ginkgo.By("verifying service-disabled is still not up")
  2294  		framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcDisabledIP, servicePort))
  2295  	})
  2296  
  2297  	ginkgo.It("should implement service.kubernetes.io/headless", func(ctx context.Context) {
  2298  		ns := f.Namespace.Name
  2299  		numPods, servicePort := 3, defaultServeHostnameServicePort
  2300  		serviceHeadlessLabels := map[string]string{v1.IsHeadlessService: ""}
  2301  
  2302  		// We will create 2 services to test creating services in both states and also dynamic updates
  2303  		// svcHeadless: Created with the label, will always be disabled. We create this early and
  2304  		//              test again late to make sure it never becomes available.
  2305  		// svcHeadlessToggled: Created without the label then the label is toggled verifying reachability at each step.
  2306  
  2307  		ginkgo.By("creating service-headless in namespace " + ns)
  2308  		svcHeadless := getServeHostnameService("service-headless")
  2309  		svcHeadless.ObjectMeta.Labels = serviceHeadlessLabels
  2310  		// This should be improved, as we do not want a Headlesss Service to contain an IP...
  2311  		_, svcHeadlessIP, err := StartServeHostnameService(ctx, cs, svcHeadless, ns, numPods)
  2312  		framework.ExpectNoError(err, "failed to create replication controller with headless service: %s in the namespace: %s", svcHeadlessIP, ns)
  2313  
  2314  		ginkgo.By("creating service in namespace " + ns)
  2315  		svcHeadlessToggled := getServeHostnameService("service-headless-toggled")
  2316  		podHeadlessToggledNames, svcHeadlessToggledIP, err := StartServeHostnameService(ctx, cs, svcHeadlessToggled, ns, numPods)
  2317  		framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svcHeadlessToggledIP, ns)
  2318  
  2319  		jig := e2eservice.NewTestJig(cs, ns, svcHeadlessToggled.ObjectMeta.Name)
  2320  
  2321  		ginkgo.By("verifying service is up")
  2322  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podHeadlessToggledNames, svcHeadlessToggledIP, servicePort))
  2323  
  2324  		ginkgo.By("verifying service-headless is not up")
  2325  		framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcHeadlessIP, servicePort))
  2326  
  2327  		ginkgo.By("adding service.kubernetes.io/headless label")
  2328  		_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
  2329  			svc.ObjectMeta.Labels = serviceHeadlessLabels
  2330  		})
  2331  		framework.ExpectNoError(err)
  2332  
  2333  		ginkgo.By("verifying service is not up")
  2334  		framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcHeadlessToggledIP, servicePort))
  2335  
  2336  		ginkgo.By("removing service.kubernetes.io/headless annotation")
  2337  		_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
  2338  			svc.ObjectMeta.Labels = nil
  2339  		})
  2340  		framework.ExpectNoError(err)
  2341  
  2342  		ginkgo.By("verifying service is up")
  2343  		framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podHeadlessToggledNames, svcHeadlessToggledIP, servicePort))
  2344  
  2345  		ginkgo.By("verifying service-headless is still not up")
  2346  		framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcHeadlessIP, servicePort))
  2347  	})
  2348  
  2349  	ginkgo.It("should be rejected when no endpoints exist", func(ctx context.Context) {
  2350  		namespace := f.Namespace.Name
  2351  		serviceName := "no-pods"
  2352  		jig := e2eservice.NewTestJig(cs, namespace, serviceName)
  2353  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests)
  2354  		framework.ExpectNoError(err)
  2355  		port := 80
  2356  
  2357  		ginkgo.By("creating a service with no endpoints")
  2358  		_, err = jig.CreateTCPServiceWithPort(ctx, nil, int32(port))
  2359  		framework.ExpectNoError(err)
  2360  
  2361  		nodeName := nodes.Items[0].Name
  2362  		podName := "execpod-noendpoints"
  2363  
  2364  		ginkgo.By(fmt.Sprintf("creating %v on node %v", podName, nodeName))
  2365  		execPod := e2epod.CreateExecPodOrFail(ctx, f.ClientSet, namespace, podName, func(pod *v1.Pod) {
  2366  			nodeSelection := e2epod.NodeSelection{Name: nodeName}
  2367  			e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
  2368  		})
  2369  
  2370  		serviceAddress := net.JoinHostPort(serviceName, strconv.Itoa(port))
  2371  		framework.Logf("waiting up to %v to connect to %v", e2eservice.KubeProxyEndpointLagTimeout, serviceAddress)
  2372  		cmd := fmt.Sprintf("/agnhost connect --timeout=3s %s", serviceAddress)
  2373  
  2374  		ginkgo.By(fmt.Sprintf("hitting service %v from pod %v on node %v", serviceAddress, podName, nodeName))
  2375  		expectedErr := "REFUSED"
  2376  		if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, func() (bool, error) {
  2377  			_, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
  2378  
  2379  			if err != nil {
  2380  				if strings.Contains(err.Error(), expectedErr) {
  2381  					framework.Logf("error contained '%s', as expected: %s", expectedErr, err.Error())
  2382  					return true, nil
  2383  				}
  2384  				framework.Logf("error didn't contain '%s', keep trying: %s", expectedErr, err.Error())
  2385  				return false, nil
  2386  			}
  2387  			return true, errors.New("expected connect call to fail")
  2388  		}); pollErr != nil {
  2389  			framework.ExpectNoError(pollErr)
  2390  		}
  2391  	})
  2392  
  2393  	// regression test for https://issues.k8s.io/109414 and https://issues.k8s.io/109718
  2394  	ginkgo.It("should be rejected for evicted pods (no endpoints exist)", func(ctx context.Context) {
  2395  		namespace := f.Namespace.Name
  2396  		serviceName := "evicted-pods"
  2397  		jig := e2eservice.NewTestJig(cs, namespace, serviceName)
  2398  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests)
  2399  		framework.ExpectNoError(err)
  2400  		nodeName := nodes.Items[0].Name
  2401  
  2402  		port := 80
  2403  
  2404  		ginkgo.By("creating a service with no endpoints")
  2405  		_, err = jig.CreateTCPServiceWithPort(ctx, func(s *v1.Service) {
  2406  			// set publish not ready addresses to cover edge cases too
  2407  			s.Spec.PublishNotReadyAddresses = true
  2408  		}, int32(port))
  2409  		framework.ExpectNoError(err)
  2410  
  2411  		// Create a pod in one node to get evicted
  2412  		ginkgo.By("creating a client pod that is going to be evicted for the service " + serviceName)
  2413  		evictedPod := e2epod.NewAgnhostPod(namespace, "evicted-pod", nil, nil, nil)
  2414  		evictedPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "sleep 10; dd if=/dev/zero of=file bs=1M count=10; sleep 10000"}
  2415  		evictedPod.Spec.Containers[0].Name = "evicted-pod"
  2416  		evictedPod.Spec.Containers[0].Resources = v1.ResourceRequirements{
  2417  			Limits: v1.ResourceList{"ephemeral-storage": resource.MustParse("5Mi")},
  2418  		}
  2419  		e2epod.NewPodClient(f).Create(ctx, evictedPod)
  2420  		err = e2epod.WaitForPodTerminatedInNamespace(ctx, f.ClientSet, evictedPod.Name, "Evicted", f.Namespace.Name)
  2421  		if err != nil {
  2422  			framework.Failf("error waiting for pod to be evicted: %v", err)
  2423  		}
  2424  
  2425  		podName := "execpod-evictedpods"
  2426  		ginkgo.By(fmt.Sprintf("creating %v on node %v", podName, nodeName))
  2427  		execPod := e2epod.CreateExecPodOrFail(ctx, f.ClientSet, namespace, podName, func(pod *v1.Pod) {
  2428  			nodeSelection := e2epod.NodeSelection{Name: nodeName}
  2429  			e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
  2430  		})
  2431  
  2432  		if epErr := wait.PollImmediate(framework.Poll, e2eservice.ServiceEndpointsTimeout, func() (bool, error) {
  2433  			endpoints, err := cs.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
  2434  			if err != nil {
  2435  				framework.Logf("error fetching '%s/%s' Endpoints: %s", namespace, serviceName, err.Error())
  2436  				return false, err
  2437  			}
  2438  			if len(endpoints.Subsets) > 0 {
  2439  				framework.Logf("expected '%s/%s' Endpoints to be empty, got: %v", namespace, serviceName, endpoints.Subsets)
  2440  				return false, nil
  2441  			}
  2442  			epsList, err := cs.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)})
  2443  			if err != nil {
  2444  				framework.Logf("error fetching '%s/%s' EndpointSlices: %s", namespace, serviceName, err.Error())
  2445  				return false, err
  2446  			}
  2447  			if len(epsList.Items) != 1 {
  2448  				framework.Logf("expected exactly 1 EndpointSlice, got: %d", len(epsList.Items))
  2449  				return false, nil
  2450  			}
  2451  			endpointSlice := epsList.Items[0]
  2452  			if len(endpointSlice.Endpoints) > 0 {
  2453  				framework.Logf("expected EndpointSlice to be empty, got %d endpoints", len(endpointSlice.Endpoints))
  2454  				return false, nil
  2455  			}
  2456  			return true, nil
  2457  		}); epErr != nil {
  2458  			framework.ExpectNoError(epErr)
  2459  		}
  2460  
  2461  		serviceAddress := net.JoinHostPort(serviceName, strconv.Itoa(port))
  2462  		framework.Logf("waiting up to %v to connect to %v", e2eservice.KubeProxyEndpointLagTimeout, serviceAddress)
  2463  		cmd := fmt.Sprintf("/agnhost connect --timeout=3s %s", serviceAddress)
  2464  
  2465  		ginkgo.By(fmt.Sprintf("hitting service %v from pod %v on node %v expected to be refused", serviceAddress, podName, nodeName))
  2466  		expectedErr := "REFUSED"
  2467  		if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, func() (bool, error) {
  2468  			_, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
  2469  
  2470  			if err != nil {
  2471  				if strings.Contains(err.Error(), expectedErr) {
  2472  					framework.Logf("error contained '%s', as expected: %s", expectedErr, err.Error())
  2473  					return true, nil
  2474  				}
  2475  				framework.Logf("error didn't contain '%s', keep trying: %s", expectedErr, err.Error())
  2476  				return false, nil
  2477  			}
  2478  			return true, errors.New("expected connect call to fail")
  2479  		}); pollErr != nil {
  2480  			framework.ExpectNoError(pollErr)
  2481  		}
  2482  	})
  2483  
  2484  	ginkgo.It("should respect internalTrafficPolicy=Local Pod to Pod", func(ctx context.Context) {
  2485  		// windows kube-proxy does not support this feature yet
  2486  		// TODO: remove this skip when windows-based proxies implement internalTrafficPolicy
  2487  		e2eskipper.SkipIfNodeOSDistroIs("windows")
  2488  
  2489  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  2490  		framework.ExpectNoError(err)
  2491  		nodeCounts := len(nodes.Items)
  2492  		if nodeCounts < 2 {
  2493  			e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
  2494  		}
  2495  		node0 := nodes.Items[0]
  2496  		node1 := nodes.Items[1]
  2497  
  2498  		serviceName := "svc-itp"
  2499  		ns := f.Namespace.Name
  2500  		servicePort := 80
  2501  
  2502  		ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP and internalTrafficPolicy=Local in namespace " + ns)
  2503  		local := v1.ServiceInternalTrafficPolicyLocal
  2504  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  2505  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  2506  			svc.Spec.Ports = []v1.ServicePort{
  2507  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
  2508  			}
  2509  			svc.Spec.InternalTrafficPolicy = &local
  2510  		})
  2511  		framework.ExpectNoError(err)
  2512  
  2513  		ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
  2514  		webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort))
  2515  		webserverPod0.Labels = jig.Labels
  2516  		e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2517  
  2518  		_, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
  2519  		framework.ExpectNoError(err)
  2520  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2521  
  2522  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
  2523  
  2524  		ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
  2525  		pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
  2526  		e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2527  
  2528  		pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
  2529  		framework.ExpectNoError(err)
  2530  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2531  
  2532  		pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
  2533  		e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
  2534  
  2535  		pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
  2536  		framework.ExpectNoError(err)
  2537  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
  2538  
  2539  		// assert 5 times that the first pause pod can connect to the Service locally and the second one errors with a timeout
  2540  		serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
  2541  		for i := 0; i < 5; i++ {
  2542  			// the first pause pod should be on the same node as the webserver, so it can connect to the local pod using clusterIP
  2543  			execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name)
  2544  
  2545  			// the second pause pod is on a different node, so it should see a connection error every time
  2546  			cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress)
  2547  			_, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
  2548  			gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
  2549  		}
  2550  	})
  2551  
  2552  	ginkgo.It("should respect internalTrafficPolicy=Local Pod (hostNetwork: true) to Pod", func(ctx context.Context) {
  2553  		// windows kube-proxy does not support this feature yet
  2554  		// TODO: remove this skip when windows-based proxies implement internalTrafficPolicy
  2555  		e2eskipper.SkipIfNodeOSDistroIs("windows")
  2556  
  2557  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  2558  		framework.ExpectNoError(err)
  2559  		nodeCounts := len(nodes.Items)
  2560  		if nodeCounts < 2 {
  2561  			e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
  2562  		}
  2563  		node0 := nodes.Items[0]
  2564  		node1 := nodes.Items[1]
  2565  
  2566  		serviceName := "svc-itp"
  2567  		ns := f.Namespace.Name
  2568  		servicePort := 8000
  2569  
  2570  		ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP and internalTrafficPolicy=Local in namespace " + ns)
  2571  		local := v1.ServiceInternalTrafficPolicyLocal
  2572  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  2573  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  2574  			svc.Spec.Ports = []v1.ServicePort{
  2575  				{Port: 8000, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(8000)},
  2576  			}
  2577  			svc.Spec.InternalTrafficPolicy = &local
  2578  		})
  2579  		framework.ExpectNoError(err)
  2580  
  2581  		ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
  2582  		webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort))
  2583  		webserverPod0.Labels = jig.Labels
  2584  		e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2585  
  2586  		_, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
  2587  		framework.ExpectNoError(err)
  2588  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2589  
  2590  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
  2591  
  2592  		ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
  2593  		pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
  2594  		pausePod0.Spec.HostNetwork = true
  2595  		e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2596  
  2597  		pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
  2598  		framework.ExpectNoError(err)
  2599  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2600  
  2601  		pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
  2602  		pausePod1.Spec.HostNetwork = true
  2603  		e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
  2604  
  2605  		pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
  2606  		framework.ExpectNoError(err)
  2607  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
  2608  
  2609  		// assert 5 times that the first pause pod can connect to the Service locally and the second one errors with a timeout
  2610  		serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
  2611  		for i := 0; i < 5; i++ {
  2612  			// the first pause pod should be on the same node as the webserver, so it can connect to the local pod using clusterIP
  2613  			execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name)
  2614  
  2615  			// the second pause pod is on a different node, so it should see a connection error every time
  2616  			cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress)
  2617  			_, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
  2618  			gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
  2619  		}
  2620  	})
  2621  
  2622  	ginkgo.It("should respect internalTrafficPolicy=Local Pod and Node, to Pod (hostNetwork: true)", func(ctx context.Context) {
  2623  		// windows kube-proxy does not support this feature yet
  2624  		// TODO: remove this skip when windows-based proxies implement internalTrafficPolicy
  2625  		e2eskipper.SkipIfNodeOSDistroIs("windows")
  2626  
  2627  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  2628  		framework.ExpectNoError(err)
  2629  		nodeCounts := len(nodes.Items)
  2630  		if nodeCounts < 2 {
  2631  			e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
  2632  		}
  2633  		node0 := nodes.Items[0]
  2634  		node1 := nodes.Items[1]
  2635  
  2636  		serviceName := "svc-itp"
  2637  		ns := f.Namespace.Name
  2638  		servicePort := 80
  2639  		// If the pod can't bind to this port, it will fail to start, and it will fail the test,
  2640  		// because is using hostNetwork. Using a not common port will reduce this possibility.
  2641  		endpointPort := 10180
  2642  
  2643  		ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP and internalTrafficPolicy=Local in namespace " + ns)
  2644  		local := v1.ServiceInternalTrafficPolicyLocal
  2645  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  2646  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  2647  			svc.Spec.Ports = []v1.ServicePort{
  2648  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(endpointPort)},
  2649  			}
  2650  			svc.Spec.InternalTrafficPolicy = &local
  2651  		})
  2652  		framework.ExpectNoError(err)
  2653  
  2654  		ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
  2655  		webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(endpointPort), "--udp-port", strconv.Itoa(endpointPort))
  2656  		webserverPod0.Labels = jig.Labels
  2657  		webserverPod0.Spec.HostNetwork = true
  2658  		e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2659  
  2660  		_, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
  2661  		framework.ExpectNoError(err)
  2662  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2663  
  2664  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {endpointPort}})
  2665  
  2666  		ginkgo.By("Creating 2 pause pods that will try to connect to the webserver")
  2667  		pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
  2668  		e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2669  
  2670  		pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
  2671  		framework.ExpectNoError(err)
  2672  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2673  
  2674  		pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
  2675  		e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
  2676  
  2677  		pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
  2678  		framework.ExpectNoError(err)
  2679  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
  2680  
  2681  		// assert 5 times that the first pause pod can connect to the Service locally and the second one errors with a timeout
  2682  		serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
  2683  		for i := 0; i < 5; i++ {
  2684  			// the first pause pod should be on the same node as the webserver, so it can connect to the local pod using clusterIP
  2685  			// note that the expected hostname is the node name because the backend pod is on host network
  2686  			execHostnameTest(*pausePod0, serviceAddress, node0.Name)
  2687  
  2688  			// the second pause pod is on a different node, so it should see a connection error every time
  2689  			cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress)
  2690  			_, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
  2691  			gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
  2692  		}
  2693  
  2694  		ginkgo.By("Creating 2 pause hostNetwork pods that will try to connect to the webserver")
  2695  		pausePod2 := e2epod.NewAgnhostPod(ns, "pause-pod-2", nil, nil, nil)
  2696  		pausePod2.Spec.HostNetwork = true
  2697  		e2epod.SetNodeSelection(&pausePod2.Spec, e2epod.NodeSelection{Name: node0.Name})
  2698  
  2699  		pausePod2, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod2, metav1.CreateOptions{})
  2700  		framework.ExpectNoError(err)
  2701  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod2.Name, f.Namespace.Name, framework.PodStartTimeout))
  2702  
  2703  		pausePod3 := e2epod.NewAgnhostPod(ns, "pause-pod-3", nil, nil, nil)
  2704  		pausePod3.Spec.HostNetwork = true
  2705  		e2epod.SetNodeSelection(&pausePod3.Spec, e2epod.NodeSelection{Name: node1.Name})
  2706  
  2707  		pausePod3, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod3, metav1.CreateOptions{})
  2708  		framework.ExpectNoError(err)
  2709  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod3.Name, f.Namespace.Name, framework.PodStartTimeout))
  2710  
  2711  		// assert 5 times that the first pause pod can connect to the Service locally and the second one errors with a timeout
  2712  		for i := 0; i < 5; i++ {
  2713  			// the first pause pod should be on the same node as the webserver, so it can connect to the local pod using clusterIP
  2714  			// note that the expected hostname is the node name because the backend pod is on host network
  2715  			execHostnameTest(*pausePod2, serviceAddress, node0.Name)
  2716  
  2717  			// the second pause pod is on a different node, so it should see a connection error every time
  2718  			cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress)
  2719  			_, err := e2eoutput.RunHostCmd(pausePod3.Namespace, pausePod3.Name, cmd)
  2720  			gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
  2721  		}
  2722  	})
  2723  
  2724  	ginkgo.It("should fail health check node port if there are only terminating endpoints", func(ctx context.Context) {
  2725  		// windows kube-proxy does not support this feature yet
  2726  		e2eskipper.SkipIfNodeOSDistroIs("windows")
  2727  
  2728  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  2729  		framework.ExpectNoError(err)
  2730  		nodeCounts := len(nodes.Items)
  2731  		if nodeCounts < 2 {
  2732  			e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
  2733  		}
  2734  		node0 := nodes.Items[0]
  2735  
  2736  		serviceName := "svc-proxy-terminating"
  2737  		ns := f.Namespace.Name
  2738  		servicePort := 80
  2739  
  2740  		ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
  2741  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  2742  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  2743  			svc.Spec.Ports = []v1.ServicePort{
  2744  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
  2745  			}
  2746  			svc.Spec.Type = v1.ServiceTypeLoadBalancer
  2747  			svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
  2748  		})
  2749  		framework.ExpectNoError(err)
  2750  
  2751  		ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
  2752  		webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100")
  2753  		webserverPod0.Labels = jig.Labels
  2754  		webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100)
  2755  		e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2756  
  2757  		_, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
  2758  		framework.ExpectNoError(err)
  2759  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2760  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
  2761  
  2762  		pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
  2763  		e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2764  
  2765  		pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
  2766  		framework.ExpectNoError(err)
  2767  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2768  
  2769  		nodeIPs := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
  2770  		healthCheckNodePortAddr := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.HealthCheckNodePort)))
  2771  		// validate that the health check node port from kube-proxy returns 200 when there are ready endpoints
  2772  		err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
  2773  			cmd := fmt.Sprintf(`curl -s -o /dev/null -w "%%{http_code}" --max-time 5 http://%s/healthz`, healthCheckNodePortAddr)
  2774  			out, err := e2eoutput.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd)
  2775  			if err != nil {
  2776  				framework.Logf("unexpected error trying to connect to nodeport %s : %v", healthCheckNodePortAddr, err)
  2777  				return false, nil
  2778  			}
  2779  
  2780  			expectedOut := "200"
  2781  			if out != expectedOut {
  2782  				framework.Logf("expected output: %s , got %s", expectedOut, out)
  2783  				return false, nil
  2784  			}
  2785  			return true, nil
  2786  		})
  2787  		framework.ExpectNoError(err)
  2788  
  2789  		// webserver should continue to serve traffic through the Service after deletion, even though the health check node port should return 503
  2790  		ginkgo.By("Terminating the webserver pod")
  2791  		err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
  2792  		framework.ExpectNoError(err)
  2793  
  2794  		// validate that the health check node port from kube-proxy returns 503 when there are no ready endpoints
  2795  		err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
  2796  			cmd := fmt.Sprintf(`curl -s -o /dev/null -w "%%{http_code}" --max-time 5 http://%s/healthz`, healthCheckNodePortAddr)
  2797  			out, err := e2eoutput.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd)
  2798  			if err != nil {
  2799  				framework.Logf("unexpected error trying to connect to nodeport %s : %v", healthCheckNodePortAddr, err)
  2800  				return false, nil
  2801  			}
  2802  
  2803  			expectedOut := "503"
  2804  			if out != expectedOut {
  2805  				framework.Logf("expected output: %s , got %s", expectedOut, out)
  2806  				return false, nil
  2807  			}
  2808  			return true, nil
  2809  		})
  2810  		framework.ExpectNoError(err)
  2811  
  2812  		// also verify that while health check node port indicates 0 endpoints and returns 503, the endpoint still serves traffic.
  2813  		nodePortAddress := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
  2814  		execHostnameTest(*pausePod0, nodePortAddress, webserverPod0.Name)
  2815  	})
  2816  
  2817  	ginkgo.It("should fallback to terminating endpoints when there are no ready endpoints with internalTrafficPolicy=Cluster", func(ctx context.Context) {
  2818  		// windows kube-proxy does not support this feature yet
  2819  		e2eskipper.SkipIfNodeOSDistroIs("windows")
  2820  
  2821  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  2822  		framework.ExpectNoError(err)
  2823  		nodeCounts := len(nodes.Items)
  2824  		if nodeCounts < 2 {
  2825  			e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
  2826  		}
  2827  		node0 := nodes.Items[0]
  2828  		node1 := nodes.Items[1]
  2829  
  2830  		serviceName := "svc-proxy-terminating"
  2831  		ns := f.Namespace.Name
  2832  		servicePort := 80
  2833  
  2834  		ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
  2835  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  2836  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  2837  			svc.Spec.Ports = []v1.ServicePort{
  2838  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
  2839  			}
  2840  		})
  2841  		framework.ExpectNoError(err)
  2842  
  2843  		ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
  2844  		webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100")
  2845  		webserverPod0.Labels = jig.Labels
  2846  		webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100)
  2847  		e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2848  
  2849  		_, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
  2850  		framework.ExpectNoError(err)
  2851  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2852  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
  2853  
  2854  		ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
  2855  		pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
  2856  		e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2857  
  2858  		pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
  2859  		framework.ExpectNoError(err)
  2860  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2861  
  2862  		pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
  2863  		e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
  2864  
  2865  		pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
  2866  		framework.ExpectNoError(err)
  2867  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
  2868  
  2869  		// webserver should continue to serve traffic through the Service after delete since:
  2870  		//  - it has a 100s termination grace period
  2871  		//  - it is the only ready endpoint
  2872  		err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
  2873  		framework.ExpectNoError(err)
  2874  
  2875  		// assert 5 times that both the local and remote pod can connect to the Service while all endpoints are terminating
  2876  		serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
  2877  		for i := 0; i < 5; i++ {
  2878  			// There's a Service with internalTrafficPolicy=Cluster,
  2879  			// with a single endpoint (which is terminating) called webserver0 running on node0.
  2880  			// pausePod0 and pausePod1 are on node0 and node1 respectively.
  2881  			// pausePod0 -> Service clusterIP succeeds because traffic policy is "Cluster"
  2882  			// pausePod1 -> Service clusterIP succeeds because traffic policy is "Cluster"
  2883  			execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name)
  2884  			execHostnameTest(*pausePod1, serviceAddress, webserverPod0.Name)
  2885  
  2886  			time.Sleep(5 * time.Second)
  2887  		}
  2888  	})
  2889  
  2890  	ginkgo.It("should fallback to local terminating endpoints when there are no ready endpoints with internalTrafficPolicy=Local", func(ctx context.Context) {
  2891  		// windows kube-proxy does not support this feature yet
  2892  		e2eskipper.SkipIfNodeOSDistroIs("windows")
  2893  
  2894  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  2895  		framework.ExpectNoError(err)
  2896  		nodeCounts := len(nodes.Items)
  2897  		if nodeCounts < 2 {
  2898  			e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
  2899  		}
  2900  		node0 := nodes.Items[0]
  2901  		node1 := nodes.Items[1]
  2902  
  2903  		serviceName := "svc-proxy-terminating"
  2904  		ns := f.Namespace.Name
  2905  		servicePort := 80
  2906  
  2907  		ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
  2908  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  2909  		local := v1.ServiceInternalTrafficPolicyLocal
  2910  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  2911  			svc.Spec.Ports = []v1.ServicePort{
  2912  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
  2913  			}
  2914  			svc.Spec.InternalTrafficPolicy = &local
  2915  		})
  2916  		framework.ExpectNoError(err)
  2917  
  2918  		ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
  2919  		webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100")
  2920  		webserverPod0.Labels = jig.Labels
  2921  		webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100)
  2922  		e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2923  
  2924  		_, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
  2925  		framework.ExpectNoError(err)
  2926  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2927  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
  2928  
  2929  		ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
  2930  		pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
  2931  		e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  2932  
  2933  		pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
  2934  		framework.ExpectNoError(err)
  2935  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  2936  
  2937  		pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
  2938  		e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
  2939  
  2940  		pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
  2941  		framework.ExpectNoError(err)
  2942  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
  2943  
  2944  		// webserver should continue to serve traffic through the Service after delete since:
  2945  		//  - it has a 100s termination grace period
  2946  		//  - it is the only ready endpoint
  2947  		err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
  2948  		framework.ExpectNoError(err)
  2949  
  2950  		// assert 5 times that the first pause pod can connect to the Service locally and the second one errors with a timeout
  2951  		serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
  2952  		for i := 0; i < 5; i++ {
  2953  			// There's a Service with internalTrafficPolicy=Local,
  2954  			// with a single endpoint (which is terminating) called webserver0 running on node0.
  2955  			// pausePod0 and pausePod1 are on node0 and node1 respectively.
  2956  			// pausePod0 -> Service clusterIP succeeds because webserver0 is running on node0 and traffic policy is "Local"
  2957  			// pausePod1 -> Service clusterIP fails because webserver0 is on a different node and traffic policy is "Local"
  2958  			execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name)
  2959  
  2960  			cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress)
  2961  			_, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
  2962  			gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
  2963  
  2964  			time.Sleep(5 * time.Second)
  2965  		}
  2966  	})
  2967  
  2968  	ginkgo.It("should fallback to terminating endpoints when there are no ready endpoints with externallTrafficPolicy=Cluster", func(ctx context.Context) {
  2969  		// windows kube-proxy does not support this feature yet
  2970  		e2eskipper.SkipIfNodeOSDistroIs("windows")
  2971  
  2972  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  2973  		framework.ExpectNoError(err)
  2974  		nodeCounts := len(nodes.Items)
  2975  		if nodeCounts < 2 {
  2976  			e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
  2977  		}
  2978  		node0 := nodes.Items[0]
  2979  		node1 := nodes.Items[1]
  2980  
  2981  		serviceName := "svc-proxy-terminating"
  2982  		ns := f.Namespace.Name
  2983  		servicePort := 80
  2984  
  2985  		ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
  2986  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  2987  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  2988  			svc.Spec.Ports = []v1.ServicePort{
  2989  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
  2990  			}
  2991  			svc.Spec.Type = v1.ServiceTypeNodePort
  2992  		})
  2993  		framework.ExpectNoError(err)
  2994  
  2995  		ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
  2996  		webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100")
  2997  		webserverPod0.Labels = jig.Labels
  2998  		webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100)
  2999  		e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  3000  
  3001  		_, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
  3002  		framework.ExpectNoError(err)
  3003  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  3004  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
  3005  
  3006  		ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
  3007  		pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
  3008  		e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  3009  
  3010  		pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
  3011  		framework.ExpectNoError(err)
  3012  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  3013  
  3014  		pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
  3015  		e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
  3016  
  3017  		pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
  3018  		framework.ExpectNoError(err)
  3019  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
  3020  
  3021  		// webserver should continue to serve traffic through the Service after delete since:
  3022  		//  - it has a 100s termination grace period
  3023  		//  - it is the only ready endpoint
  3024  		err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
  3025  		framework.ExpectNoError(err)
  3026  
  3027  		// assert 5 times that both the local and remote pod can connect to the Service NodePort while all endpoints are terminating
  3028  		nodeIPs := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
  3029  		nodePortAddress := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
  3030  		for i := 0; i < 5; i++ {
  3031  			// There's a Service Type=NodePort with externalTrafficPolicy=Cluster,
  3032  			// with a single endpoint (which is terminating) called webserver0 running on node0.
  3033  			// pausePod0 and pausePod1 are on node0 and node1 respectively.
  3034  			// pausePod0 -> node0 node port succeeds because webserver0 is running on node0 and traffic policy is "Cluster"
  3035  			// pausePod1 -> node0 node port succeeds because webserver0 is running on node0 and traffic policy is "Cluster"
  3036  			execHostnameTest(*pausePod0, nodePortAddress, webserverPod0.Name)
  3037  			execHostnameTest(*pausePod1, nodePortAddress, webserverPod0.Name)
  3038  
  3039  			time.Sleep(5 * time.Second)
  3040  		}
  3041  	})
  3042  
  3043  	ginkgo.It("should fallback to local terminating endpoints when there are no ready endpoints with externalTrafficPolicy=Local", func(ctx context.Context) {
  3044  		// windows kube-proxy does not support this feature yet
  3045  		e2eskipper.SkipIfNodeOSDistroIs("windows")
  3046  
  3047  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  3048  		framework.ExpectNoError(err)
  3049  		nodeCounts := len(nodes.Items)
  3050  		if nodeCounts < 2 {
  3051  			e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
  3052  		}
  3053  		node0 := nodes.Items[0]
  3054  		node1 := nodes.Items[1]
  3055  
  3056  		serviceName := "svc-proxy-terminating"
  3057  		ns := f.Namespace.Name
  3058  		servicePort := 80
  3059  
  3060  		ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
  3061  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  3062  		svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
  3063  			svc.Spec.Ports = []v1.ServicePort{
  3064  				{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
  3065  			}
  3066  			svc.Spec.Type = v1.ServiceTypeNodePort
  3067  			svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
  3068  		})
  3069  		framework.ExpectNoError(err)
  3070  
  3071  		ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
  3072  		webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100")
  3073  		webserverPod0.Labels = jig.Labels
  3074  		webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100)
  3075  		e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  3076  
  3077  		_, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
  3078  		framework.ExpectNoError(err)
  3079  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  3080  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
  3081  
  3082  		ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
  3083  		pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
  3084  		e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
  3085  
  3086  		pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
  3087  		framework.ExpectNoError(err)
  3088  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
  3089  
  3090  		pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
  3091  		e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
  3092  
  3093  		pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
  3094  		framework.ExpectNoError(err)
  3095  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
  3096  
  3097  		// webserver should continue to serve traffic through the Service after delete since:
  3098  		//  - it has a 100s termination grace period
  3099  		//  - it is the only ready endpoint
  3100  		err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
  3101  		framework.ExpectNoError(err)
  3102  
  3103  		// assert 5 times that the first pause pod can connect to the Service locally and the second one errors with a timeout
  3104  		nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
  3105  		nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP)
  3106  		nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
  3107  		nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
  3108  		for i := 0; i < 5; i++ {
  3109  			// There's a Service Type=NodePort with externalTrafficPolicy=Local,
  3110  			// with a single endpoint (which is terminating) called webserver0 running on node0.
  3111  			// pausePod0 and pausePod1 are on node0 and node1 respectively.
  3112  			// pausePod0 -> node1 node port fails because it's "external" and there are no local endpoints
  3113  			// pausePod1 -> node0 node port succeeds because webserver0 is running on node0
  3114  			// pausePod0 -> node0 node port succeeds because webserver0 is running on node0
  3115  			//
  3116  			// NOTE: pausePod1 -> node1 will succeed for kube-proxy because kube-proxy considers pod-to-same-node-NodePort
  3117  			// connections as neither internal nor external and always get Cluster traffic policy. However, we do not test
  3118  			// this here because not all Network implementations follow kube-proxy's interpretation of "destination"
  3119  			// traffic policy. See: https://github.com/kubernetes/kubernetes/pull/123622
  3120  			cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1)
  3121  			_, err := e2eoutput.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd)
  3122  			gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to node port for pausePod0")
  3123  
  3124  			execHostnameTest(*pausePod0, nodePortAddress0, webserverPod0.Name)
  3125  			execHostnameTest(*pausePod1, nodePortAddress0, webserverPod0.Name)
  3126  
  3127  			time.Sleep(5 * time.Second)
  3128  		}
  3129  	})
  3130  
  3131  	/*
  3132  	   Release: v1.18
  3133  	   Testname: Find Kubernetes Service in default Namespace
  3134  	   Description: List all Services in all Namespaces, response MUST include a Service named Kubernetes with the Namespace of default.
  3135  	*/
  3136  	framework.ConformanceIt("should find a service from listing all namespaces", func(ctx context.Context) {
  3137  		ginkgo.By("fetching services")
  3138  		svcs, _ := f.ClientSet.CoreV1().Services("").List(ctx, metav1.ListOptions{})
  3139  
  3140  		foundSvc := false
  3141  		for _, svc := range svcs.Items {
  3142  			if svc.ObjectMeta.Name == "kubernetes" && svc.ObjectMeta.Namespace == "default" {
  3143  				foundSvc = true
  3144  				break
  3145  			}
  3146  		}
  3147  
  3148  		if !foundSvc {
  3149  			framework.Fail("could not find service 'kubernetes' in service list in all namespaces")
  3150  		}
  3151  	})
  3152  
  3153  	/*
  3154  	   Release: v1.19
  3155  	   Testname: Endpoint resource lifecycle
  3156  	   Description: Create an endpoint, the endpoint MUST exist.
  3157  	   The endpoint is updated with a new label, a check after the update MUST find the changes.
  3158  	   The endpoint is then patched with a new IPv4 address and port, a check after the patch MUST the changes.
  3159  	   The endpoint is deleted by it's label, a watch listens for the deleted watch event.
  3160  	*/
  3161  	framework.ConformanceIt("should test the lifecycle of an Endpoint", func(ctx context.Context) {
  3162  		testNamespaceName := f.Namespace.Name
  3163  		testEndpointName := "testservice"
  3164  		testEndpoints := v1.Endpoints{
  3165  			ObjectMeta: metav1.ObjectMeta{
  3166  				Name: testEndpointName,
  3167  				Labels: map[string]string{
  3168  					"test-endpoint-static": "true",
  3169  				},
  3170  			},
  3171  			Subsets: []v1.EndpointSubset{{
  3172  				Addresses: []v1.EndpointAddress{{
  3173  					IP: "10.0.0.24",
  3174  				}},
  3175  				Ports: []v1.EndpointPort{{
  3176  					Name:     "http",
  3177  					Port:     80,
  3178  					Protocol: v1.ProtocolTCP,
  3179  				}},
  3180  			}},
  3181  		}
  3182  		w := &cache.ListWatch{
  3183  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  3184  				options.LabelSelector = "test-endpoint-static=true"
  3185  				return f.ClientSet.CoreV1().Endpoints(testNamespaceName).Watch(ctx, options)
  3186  			},
  3187  		}
  3188  		endpointsList, err := f.ClientSet.CoreV1().Endpoints("").List(ctx, metav1.ListOptions{LabelSelector: "test-endpoint-static=true"})
  3189  		framework.ExpectNoError(err, "failed to list Endpoints")
  3190  
  3191  		ginkgo.By("creating an Endpoint")
  3192  		_, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Create(ctx, &testEndpoints, metav1.CreateOptions{})
  3193  		framework.ExpectNoError(err, "failed to create Endpoint")
  3194  		ginkgo.By("waiting for available Endpoint")
  3195  		ctxUntil, cancel := context.WithTimeout(ctx, 30*time.Second)
  3196  		defer cancel()
  3197  		_, err = watchtools.Until(ctxUntil, endpointsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
  3198  			switch event.Type {
  3199  			case watch.Added:
  3200  				if endpoints, ok := event.Object.(*v1.Endpoints); ok {
  3201  					found := endpoints.ObjectMeta.Name == endpoints.Name &&
  3202  						endpoints.Labels["test-endpoint-static"] == "true"
  3203  					return found, nil
  3204  				}
  3205  			default:
  3206  				framework.Logf("observed event type %v", event.Type)
  3207  			}
  3208  			return false, nil
  3209  		})
  3210  		framework.ExpectNoError(err, "failed to see %v event", watch.Added)
  3211  
  3212  		ginkgo.By("listing all Endpoints")
  3213  		endpointsList, err = f.ClientSet.CoreV1().Endpoints("").List(ctx, metav1.ListOptions{LabelSelector: "test-endpoint-static=true"})
  3214  		framework.ExpectNoError(err, "failed to list Endpoints")
  3215  		eventFound := false
  3216  		var foundEndpoint v1.Endpoints
  3217  		for _, endpoint := range endpointsList.Items {
  3218  			if endpoint.ObjectMeta.Name == testEndpointName && endpoint.ObjectMeta.Namespace == testNamespaceName {
  3219  				eventFound = true
  3220  				foundEndpoint = endpoint
  3221  				break
  3222  			}
  3223  		}
  3224  		if !eventFound {
  3225  			framework.Fail("unable to find Endpoint Service in list of Endpoints")
  3226  		}
  3227  
  3228  		ginkgo.By("updating the Endpoint")
  3229  		foundEndpoint.ObjectMeta.Labels["test-service"] = "updated"
  3230  		_, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Update(ctx, &foundEndpoint, metav1.UpdateOptions{})
  3231  		framework.ExpectNoError(err, "failed to update Endpoint with new label")
  3232  
  3233  		ctxUntil, cancel = context.WithTimeout(ctx, 30*time.Second)
  3234  		defer cancel()
  3235  		_, err = watchtools.Until(ctxUntil, endpointsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
  3236  			switch event.Type {
  3237  			case watch.Modified:
  3238  				if endpoints, ok := event.Object.(*v1.Endpoints); ok {
  3239  					found := endpoints.ObjectMeta.Name == endpoints.Name &&
  3240  						endpoints.Labels["test-endpoint-static"] == "true"
  3241  					return found, nil
  3242  				}
  3243  			default:
  3244  				framework.Logf("observed event type %v", event.Type)
  3245  			}
  3246  			return false, nil
  3247  		})
  3248  		framework.ExpectNoError(err, "failed to see %v event", watch.Modified)
  3249  
  3250  		ginkgo.By("fetching the Endpoint")
  3251  		endpoints, err := f.ClientSet.CoreV1().Endpoints(testNamespaceName).Get(ctx, testEndpointName, metav1.GetOptions{})
  3252  		framework.ExpectNoError(err, "failed to fetch Endpoint")
  3253  		gomega.Expect(foundEndpoint.ObjectMeta.Labels).To(gomega.HaveKeyWithValue("test-service", "updated"), "failed to update Endpoint %v in namespace %v label not updated", testEndpointName, testNamespaceName)
  3254  
  3255  		endpointPatch, err := json.Marshal(map[string]interface{}{
  3256  			"metadata": map[string]interface{}{
  3257  				"labels": map[string]string{
  3258  					"test-service": "patched",
  3259  				},
  3260  			},
  3261  			"subsets": []map[string]interface{}{
  3262  				{
  3263  					"addresses": []map[string]string{
  3264  						{
  3265  							"ip": "10.0.0.25",
  3266  						},
  3267  					},
  3268  					"ports": []map[string]interface{}{
  3269  						{
  3270  							"name": "http-test",
  3271  							"port": int32(8080),
  3272  						},
  3273  					},
  3274  				},
  3275  			},
  3276  		})
  3277  		framework.ExpectNoError(err, "failed to marshal JSON for WatchEvent patch")
  3278  		ginkgo.By("patching the Endpoint")
  3279  		_, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Patch(ctx, testEndpointName, types.StrategicMergePatchType, []byte(endpointPatch), metav1.PatchOptions{})
  3280  		framework.ExpectNoError(err, "failed to patch Endpoint")
  3281  		ctxUntil, cancel = context.WithTimeout(ctx, 30*time.Second)
  3282  		defer cancel()
  3283  		_, err = watchtools.Until(ctxUntil, endpoints.ResourceVersion, w, func(event watch.Event) (bool, error) {
  3284  			switch event.Type {
  3285  			case watch.Modified:
  3286  				if endpoints, ok := event.Object.(*v1.Endpoints); ok {
  3287  					found := endpoints.ObjectMeta.Name == endpoints.Name &&
  3288  						endpoints.Labels["test-endpoint-static"] == "true"
  3289  					return found, nil
  3290  				}
  3291  			default:
  3292  				framework.Logf("observed event type %v", event.Type)
  3293  			}
  3294  			return false, nil
  3295  		})
  3296  		framework.ExpectNoError(err, "failed to see %v event", watch.Modified)
  3297  
  3298  		ginkgo.By("fetching the Endpoint")
  3299  		endpoints, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Get(ctx, testEndpointName, metav1.GetOptions{})
  3300  		framework.ExpectNoError(err, "failed to fetch Endpoint")
  3301  		gomega.Expect(endpoints.ObjectMeta.Labels).To(gomega.HaveKeyWithValue("test-service", "patched"), "failed to patch Endpoint with Label")
  3302  		endpointSubsetOne := endpoints.Subsets[0]
  3303  		endpointSubsetOneAddresses := endpointSubsetOne.Addresses[0]
  3304  		endpointSubsetOnePorts := endpointSubsetOne.Ports[0]
  3305  		gomega.Expect(endpointSubsetOneAddresses.IP).To(gomega.Equal("10.0.0.25"), "failed to patch Endpoint")
  3306  		gomega.Expect(endpointSubsetOnePorts.Name).To(gomega.Equal("http-test"), "failed to patch Endpoint")
  3307  		gomega.Expect(endpointSubsetOnePorts.Port).To(gomega.Equal(int32(8080)), "failed to patch Endpoint")
  3308  
  3309  		ginkgo.By("deleting the Endpoint by Collection")
  3310  		err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "test-endpoint-static=true"})
  3311  		framework.ExpectNoError(err, "failed to delete Endpoint by Collection")
  3312  
  3313  		ginkgo.By("waiting for Endpoint deletion")
  3314  		ctxUntil, cancel = context.WithTimeout(ctx, 30*time.Second)
  3315  		defer cancel()
  3316  		_, err = watchtools.Until(ctxUntil, endpoints.ResourceVersion, w, func(event watch.Event) (bool, error) {
  3317  			switch event.Type {
  3318  			case watch.Deleted:
  3319  				if endpoints, ok := event.Object.(*v1.Endpoints); ok {
  3320  					found := endpoints.ObjectMeta.Name == endpoints.Name &&
  3321  						endpoints.Labels["test-endpoint-static"] == "true"
  3322  					return found, nil
  3323  				}
  3324  			default:
  3325  				framework.Logf("observed event type %v", event.Type)
  3326  			}
  3327  			return false, nil
  3328  		})
  3329  		framework.ExpectNoError(err, "failed to see %v event", watch.Deleted)
  3330  
  3331  		ginkgo.By("fetching the Endpoint")
  3332  		_, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Get(ctx, testEndpointName, metav1.GetOptions{})
  3333  		gomega.Expect(err).To(gomega.HaveOccurred(), "should not be able to fetch Endpoint")
  3334  	})
  3335  
  3336  	/*
  3337  		Release: v1.21
  3338  		Testname: Service, complete ServiceStatus lifecycle
  3339  		Description: Create a service, the service MUST exist.
  3340  		When retrieving /status the action MUST be validated.
  3341  		When patching /status the action MUST be validated.
  3342  		When updating /status the action MUST be validated.
  3343  		When patching a service the action MUST be validated.
  3344  	*/
  3345  	framework.ConformanceIt("should complete a service status lifecycle", func(ctx context.Context) {
  3346  
  3347  		ns := f.Namespace.Name
  3348  		svcResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
  3349  		svcClient := f.ClientSet.CoreV1().Services(ns)
  3350  
  3351  		testSvcName := "test-service-" + utilrand.String(5)
  3352  		testSvcLabels := map[string]string{"test-service-static": "true"}
  3353  		testSvcLabelsFlat := "test-service-static=true"
  3354  
  3355  		w := &cache.ListWatch{
  3356  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  3357  				options.LabelSelector = testSvcLabelsFlat
  3358  				return cs.CoreV1().Services(ns).Watch(ctx, options)
  3359  			},
  3360  		}
  3361  
  3362  		svcList, err := cs.CoreV1().Services("").List(ctx, metav1.ListOptions{LabelSelector: testSvcLabelsFlat})
  3363  		framework.ExpectNoError(err, "failed to list Services")
  3364  
  3365  		ginkgo.By("creating a Service")
  3366  		testService := v1.Service{
  3367  			ObjectMeta: metav1.ObjectMeta{
  3368  				Name:   testSvcName,
  3369  				Labels: testSvcLabels,
  3370  			},
  3371  			Spec: v1.ServiceSpec{
  3372  				Type: "LoadBalancer",
  3373  				Ports: []v1.ServicePort{{
  3374  					Name:       "http",
  3375  					Protocol:   v1.ProtocolTCP,
  3376  					Port:       int32(80),
  3377  					TargetPort: intstr.FromInt32(80),
  3378  				}},
  3379  				LoadBalancerClass: utilpointer.String("example.com/internal-vip"),
  3380  			},
  3381  		}
  3382  		_, err = cs.CoreV1().Services(ns).Create(ctx, &testService, metav1.CreateOptions{})
  3383  		framework.ExpectNoError(err, "failed to create Service")
  3384  
  3385  		ginkgo.By("watching for the Service to be added")
  3386  		ctxUntil, cancel := context.WithTimeout(ctx, svcReadyTimeout)
  3387  		defer cancel()
  3388  		_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
  3389  			if svc, ok := event.Object.(*v1.Service); ok {
  3390  				found := svc.ObjectMeta.Name == testService.ObjectMeta.Name &&
  3391  					svc.ObjectMeta.Namespace == ns &&
  3392  					svc.Labels["test-service-static"] == "true"
  3393  				if !found {
  3394  					framework.Logf("observed Service %v in namespace %v with labels: %v & ports %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Spec.Ports)
  3395  					return false, nil
  3396  				}
  3397  				framework.Logf("Found Service %v in namespace %v with labels: %v & ports %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Spec.Ports)
  3398  				return found, nil
  3399  			}
  3400  			framework.Logf("Observed event: %+v", event.Object)
  3401  			return false, nil
  3402  		})
  3403  		framework.ExpectNoError(err, "Failed to locate Service %v in namespace %v", testService.ObjectMeta.Name, ns)
  3404  		framework.Logf("Service %s created", testSvcName)
  3405  
  3406  		ginkgo.By("Getting /status")
  3407  		svcStatusUnstructured, err := f.DynamicClient.Resource(svcResource).Namespace(ns).Get(ctx, testSvcName, metav1.GetOptions{}, "status")
  3408  		framework.ExpectNoError(err, "Failed to fetch ServiceStatus of Service %s in namespace %s", testSvcName, ns)
  3409  		svcStatusBytes, err := json.Marshal(svcStatusUnstructured)
  3410  		framework.ExpectNoError(err, "Failed to marshal unstructured response. %v", err)
  3411  
  3412  		var svcStatus v1.Service
  3413  		err = json.Unmarshal(svcStatusBytes, &svcStatus)
  3414  		framework.ExpectNoError(err, "Failed to unmarshal JSON bytes to a Service object type")
  3415  		framework.Logf("Service %s has LoadBalancer: %v", testSvcName, svcStatus.Status.LoadBalancer)
  3416  
  3417  		ginkgo.By("patching the ServiceStatus")
  3418  		lbStatus := v1.LoadBalancerStatus{
  3419  			Ingress: []v1.LoadBalancerIngress{{IP: "203.0.113.1"}},
  3420  		}
  3421  		lbStatusJSON, err := json.Marshal(lbStatus)
  3422  		framework.ExpectNoError(err, "Failed to marshal JSON. %v", err)
  3423  		_, err = svcClient.Patch(ctx, testSvcName, types.MergePatchType,
  3424  			[]byte(`{"metadata":{"annotations":{"patchedstatus":"true"}},"status":{"loadBalancer":`+string(lbStatusJSON)+`}}`),
  3425  			metav1.PatchOptions{}, "status")
  3426  		framework.ExpectNoError(err, "Could not patch service status", err)
  3427  
  3428  		ginkgo.By("watching for the Service to be patched")
  3429  		ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
  3430  		defer cancel()
  3431  
  3432  		_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
  3433  			if svc, ok := event.Object.(*v1.Service); ok {
  3434  				found := svc.ObjectMeta.Name == testService.ObjectMeta.Name &&
  3435  					svc.ObjectMeta.Namespace == ns &&
  3436  					svc.Annotations["patchedstatus"] == "true"
  3437  				if !found {
  3438  					framework.Logf("observed Service %v in namespace %v with annotations: %v & LoadBalancer: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
  3439  					return false, nil
  3440  				}
  3441  				framework.Logf("Found Service %v in namespace %v with annotations: %v & LoadBalancer: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
  3442  				return found, nil
  3443  			}
  3444  			framework.Logf("Observed event: %+v", event.Object)
  3445  			return false, nil
  3446  		})
  3447  		framework.ExpectNoError(err, "failed to locate Service %v in namespace %v", testService.ObjectMeta.Name, ns)
  3448  		framework.Logf("Service %s has service status patched", testSvcName)
  3449  
  3450  		ginkgo.By("updating the ServiceStatus")
  3451  
  3452  		var statusToUpdate, updatedStatus *v1.Service
  3453  		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
  3454  			statusToUpdate, err = svcClient.Get(ctx, testSvcName, metav1.GetOptions{})
  3455  			framework.ExpectNoError(err, "Unable to retrieve service %s", testSvcName)
  3456  
  3457  			statusToUpdate.Status.Conditions = append(statusToUpdate.Status.Conditions, metav1.Condition{
  3458  				Type:    "StatusUpdate",
  3459  				Status:  metav1.ConditionTrue,
  3460  				Reason:  "E2E",
  3461  				Message: "Set from e2e test",
  3462  			})
  3463  
  3464  			updatedStatus, err = svcClient.UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{})
  3465  			return err
  3466  		})
  3467  		framework.ExpectNoError(err, "\n\n Failed to UpdateStatus. %v\n\n", err)
  3468  		framework.Logf("updatedStatus.Conditions: %#v", updatedStatus.Status.Conditions)
  3469  
  3470  		ginkgo.By("watching for the Service to be updated")
  3471  		ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
  3472  		defer cancel()
  3473  		_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
  3474  			if svc, ok := event.Object.(*v1.Service); ok {
  3475  				found := svc.ObjectMeta.Name == testService.ObjectMeta.Name &&
  3476  					svc.ObjectMeta.Namespace == ns &&
  3477  					svc.Annotations["patchedstatus"] == "true"
  3478  				if !found {
  3479  					framework.Logf("Observed Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
  3480  					return false, nil
  3481  				}
  3482  				for _, cond := range svc.Status.Conditions {
  3483  					if cond.Type == "StatusUpdate" &&
  3484  						cond.Reason == "E2E" &&
  3485  						cond.Message == "Set from e2e test" {
  3486  						framework.Logf("Found Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.Conditions)
  3487  						return found, nil
  3488  					} else {
  3489  						framework.Logf("Observed Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
  3490  						return false, nil
  3491  					}
  3492  				}
  3493  			}
  3494  			framework.Logf("Observed event: %+v", event.Object)
  3495  			return false, nil
  3496  		})
  3497  		framework.ExpectNoError(err, "failed to locate Service %v in namespace %v", testService.ObjectMeta.Name, ns)
  3498  		framework.Logf("Service %s has service status updated", testSvcName)
  3499  
  3500  		ginkgo.By("patching the service")
  3501  		servicePatchPayload, err := json.Marshal(v1.Service{
  3502  			ObjectMeta: metav1.ObjectMeta{
  3503  				Labels: map[string]string{
  3504  					"test-service": "patched",
  3505  				},
  3506  			},
  3507  		})
  3508  
  3509  		_, err = svcClient.Patch(ctx, testSvcName, types.StrategicMergePatchType, []byte(servicePatchPayload), metav1.PatchOptions{})
  3510  		framework.ExpectNoError(err, "failed to patch service. %v", err)
  3511  
  3512  		ginkgo.By("watching for the Service to be patched")
  3513  		ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
  3514  		defer cancel()
  3515  		_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
  3516  			if svc, ok := event.Object.(*v1.Service); ok {
  3517  				found := svc.ObjectMeta.Name == testService.ObjectMeta.Name &&
  3518  					svc.ObjectMeta.Namespace == ns &&
  3519  					svc.Labels["test-service"] == "patched"
  3520  				if !found {
  3521  					framework.Logf("observed Service %v in namespace %v with labels: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels)
  3522  					return false, nil
  3523  				}
  3524  				framework.Logf("Found Service %v in namespace %v with labels: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels)
  3525  				return found, nil
  3526  			}
  3527  			framework.Logf("Observed event: %+v", event.Object)
  3528  			return false, nil
  3529  		})
  3530  		framework.ExpectNoError(err, "failed to locate Service %v in namespace %v", testService.ObjectMeta.Name, ns)
  3531  		framework.Logf("Service %s patched", testSvcName)
  3532  
  3533  		ginkgo.By("deleting the service")
  3534  		err = cs.CoreV1().Services(ns).Delete(ctx, testSvcName, metav1.DeleteOptions{})
  3535  		framework.ExpectNoError(err, "failed to delete the Service. %v", err)
  3536  
  3537  		ginkgo.By("watching for the Service to be deleted")
  3538  		ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
  3539  		defer cancel()
  3540  		_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
  3541  			switch event.Type {
  3542  			case watch.Deleted:
  3543  				if svc, ok := event.Object.(*v1.Service); ok {
  3544  					found := svc.ObjectMeta.Name == testService.ObjectMeta.Name &&
  3545  						svc.ObjectMeta.Namespace == ns &&
  3546  						svc.Labels["test-service-static"] == "true"
  3547  					if !found {
  3548  						framework.Logf("observed Service %v in namespace %v with labels: %v & annotations: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Annotations)
  3549  						return false, nil
  3550  					}
  3551  					framework.Logf("Found Service %v in namespace %v with labels: %v & annotations: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Annotations)
  3552  					return found, nil
  3553  				}
  3554  			default:
  3555  				framework.Logf("Observed event: %+v", event.Type)
  3556  			}
  3557  			return false, nil
  3558  		})
  3559  		framework.ExpectNoError(err, "failed to delete Service %v in namespace %v", testService.ObjectMeta.Name, ns)
  3560  		framework.Logf("Service %s deleted", testSvcName)
  3561  	})
  3562  
  3563  	/*
  3564  		Release: v1.23
  3565  		Testname: Service, deletes a collection of services
  3566  		Description: Create three services with the required
  3567  		labels and ports. It MUST locate three services in the
  3568  		test namespace. It MUST succeed at deleting a collection
  3569  		of services via a label selector. It MUST locate only
  3570  		one service after deleting the service collection.
  3571  	*/
  3572  	framework.ConformanceIt("should delete a collection of services", func(ctx context.Context) {
  3573  
  3574  		ns := f.Namespace.Name
  3575  		svcClient := f.ClientSet.CoreV1().Services(ns)
  3576  		svcResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
  3577  		svcDynamicClient := f.DynamicClient.Resource(svcResource).Namespace(ns)
  3578  
  3579  		svcLabel := map[string]string{"e2e-test-service": "delete"}
  3580  		deleteLabel := labels.SelectorFromSet(svcLabel).String()
  3581  
  3582  		ginkgo.By("creating a collection of services")
  3583  
  3584  		testServices := []struct {
  3585  			name  string
  3586  			label map[string]string
  3587  			port  int
  3588  		}{
  3589  			{
  3590  				name:  "e2e-svc-a-" + utilrand.String(5),
  3591  				label: map[string]string{"e2e-test-service": "delete"},
  3592  				port:  8001,
  3593  			},
  3594  			{
  3595  				name:  "e2e-svc-b-" + utilrand.String(5),
  3596  				label: map[string]string{"e2e-test-service": "delete"},
  3597  				port:  8002,
  3598  			},
  3599  			{
  3600  				name:  "e2e-svc-c-" + utilrand.String(5),
  3601  				label: map[string]string{"e2e-test-service": "keep"},
  3602  				port:  8003,
  3603  			},
  3604  		}
  3605  
  3606  		for _, testService := range testServices {
  3607  			func() {
  3608  				framework.Logf("Creating %s", testService.name)
  3609  
  3610  				svc := v1.Service{
  3611  					ObjectMeta: metav1.ObjectMeta{
  3612  						Name:   testService.name,
  3613  						Labels: testService.label,
  3614  					},
  3615  					Spec: v1.ServiceSpec{
  3616  						Type: "ClusterIP",
  3617  						Ports: []v1.ServicePort{{
  3618  							Name:       "http",
  3619  							Protocol:   v1.ProtocolTCP,
  3620  							Port:       int32(testService.port),
  3621  							TargetPort: intstr.FromInt(testService.port),
  3622  						}},
  3623  					},
  3624  				}
  3625  				_, err := svcClient.Create(ctx, &svc, metav1.CreateOptions{})
  3626  				framework.ExpectNoError(err, "failed to create Service")
  3627  
  3628  			}()
  3629  		}
  3630  
  3631  		svcList, err := cs.CoreV1().Services(ns).List(ctx, metav1.ListOptions{})
  3632  		framework.ExpectNoError(err, "failed to list Services")
  3633  		gomega.Expect(svcList.Items).To(gomega.HaveLen(3), "Required count of services out of sync")
  3634  
  3635  		ginkgo.By("deleting service collection")
  3636  		err = svcDynamicClient.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: deleteLabel})
  3637  		framework.ExpectNoError(err, "failed to delete service collection. %v", err)
  3638  
  3639  		svcList, err = cs.CoreV1().Services(ns).List(ctx, metav1.ListOptions{})
  3640  		framework.ExpectNoError(err, "failed to list Services")
  3641  		gomega.Expect(svcList.Items).To(gomega.HaveLen(1), "Required count of services out of sync")
  3642  
  3643  		framework.Logf("Collection of services has been deleted")
  3644  	})
  3645  
  3646  	/*
  3647  		Release: v1.29
  3648  		Testname: Service, should serve endpoints on same port and different protocols.
  3649  		Description: Create one service with two ports, same port number and different protocol TCP and UDP.
  3650  		It MUST be able to forward traffic to both ports.
  3651  		Update the Service to expose only the TCP port, it MUST succeed to connect to the TCP port and fail
  3652  		to connect to the UDP port.
  3653  		Update the Service to expose only the UDP port, it MUST succeed to connect to the UDP port and fail
  3654  		to connect to the TCP port.
  3655  	*/
  3656  	framework.ConformanceIt("should serve endpoints on same port and different protocols", func(ctx context.Context) {
  3657  		serviceName := "multiprotocol-test"
  3658  		testLabels := map[string]string{"app": "multiport"}
  3659  		ns := f.Namespace.Name
  3660  		containerPort := 80
  3661  
  3662  		svcTCPport := v1.ServicePort{
  3663  			Name:       "tcp-port",
  3664  			Port:       80,
  3665  			TargetPort: intstr.FromInt(containerPort),
  3666  			Protocol:   v1.ProtocolTCP,
  3667  		}
  3668  		svcUDPport := v1.ServicePort{
  3669  			Name:       "udp-port",
  3670  			Port:       80,
  3671  			TargetPort: intstr.FromInt(containerPort),
  3672  			Protocol:   v1.ProtocolUDP,
  3673  		}
  3674  
  3675  		ginkgo.By("creating service " + serviceName + " in namespace " + ns)
  3676  
  3677  		testService := v1.Service{
  3678  			ObjectMeta: metav1.ObjectMeta{
  3679  				Name:   serviceName,
  3680  				Labels: testLabels,
  3681  			},
  3682  			Spec: v1.ServiceSpec{
  3683  				Type:     v1.ServiceTypeClusterIP,
  3684  				Selector: testLabels,
  3685  				Ports:    []v1.ServicePort{svcTCPport, svcUDPport},
  3686  			},
  3687  		}
  3688  		service, err := cs.CoreV1().Services(ns).Create(ctx, &testService, metav1.CreateOptions{})
  3689  		framework.ExpectNoError(err, "failed to create Service")
  3690  
  3691  		containerPorts := []v1.ContainerPort{{
  3692  			Name:          svcTCPport.Name,
  3693  			ContainerPort: int32(containerPort),
  3694  			Protocol:      v1.ProtocolTCP,
  3695  		}, {
  3696  			Name:          svcUDPport.Name,
  3697  			ContainerPort: int32(containerPort),
  3698  			Protocol:      v1.ProtocolUDP,
  3699  		}}
  3700  		podname1 := "pod1"
  3701  		ginkgo.By("creating pod " + podname1 + " in namespace " + ns)
  3702  		createPodOrFail(ctx, f, ns, podname1, testLabels, containerPorts, "netexec", "--http-port", strconv.Itoa(containerPort), "--udp-port", strconv.Itoa(containerPort))
  3703  		validateEndpointsPortsWithProtocolsOrFail(cs, ns, serviceName, fullPortsByPodName{podname1: containerPorts})
  3704  
  3705  		ginkgo.By("Checking if the Service forwards traffic to the TCP and UDP port")
  3706  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
  3707  		err = testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolTCP, execPod, 30*time.Second)
  3708  		if err != nil {
  3709  			framework.Failf("Failed to connect to Service TCP port: %v", err)
  3710  		}
  3711  		err = testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolUDP, execPod, 30*time.Second)
  3712  		if err != nil {
  3713  			framework.Failf("Failed to connect to Service UDP port: %v", err)
  3714  		}
  3715  
  3716  		ginkgo.By("Checking if the Service forwards traffic to TCP only")
  3717  		service, err = cs.CoreV1().Services(ns).Get(ctx, serviceName, metav1.GetOptions{})
  3718  		if err != nil {
  3719  			framework.Failf("failed to get Service %q: %v", serviceName, err)
  3720  		}
  3721  		service.Spec.Ports = []v1.ServicePort{svcTCPport}
  3722  		_, err = cs.CoreV1().Services(ns).Update(ctx, service, metav1.UpdateOptions{})
  3723  		if err != nil {
  3724  			framework.Failf("failed to get Service %q: %v", serviceName, err)
  3725  		}
  3726  
  3727  		// test reachability
  3728  		err = testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolTCP, execPod, 30*time.Second)
  3729  		if err != nil {
  3730  			framework.Failf("Failed to connect to Service TCP port: %v", err)
  3731  		}
  3732  		// take into account the NetworkProgrammingLatency
  3733  		// testEndpointReachability tries 3 times every 3 second
  3734  		// we retry again during 30 seconds to check if the port stops forwarding
  3735  		gomega.Eventually(ctx, func() error {
  3736  			return testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolUDP, execPod, 6*time.Second)
  3737  		}).WithTimeout(30 * time.Second).WithPolling(5 * time.Second).ShouldNot(gomega.BeNil())
  3738  
  3739  		ginkgo.By("Checking if the Service forwards traffic to UDP only")
  3740  		service, err = cs.CoreV1().Services(ns).Get(ctx, serviceName, metav1.GetOptions{})
  3741  		if err != nil {
  3742  			framework.Failf("failed to get Service %q: %v", serviceName, err)
  3743  		}
  3744  		service.Spec.Ports = []v1.ServicePort{svcUDPport}
  3745  		_, err = cs.CoreV1().Services(ns).Update(ctx, service, metav1.UpdateOptions{})
  3746  		if err != nil {
  3747  			framework.Failf("failed to update Service %q: %v", serviceName, err)
  3748  		}
  3749  
  3750  		// test reachability
  3751  		err = testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolUDP, execPod, 30*time.Second)
  3752  		if err != nil {
  3753  			framework.Failf("Failed to connect to Service UDP port: %v", err)
  3754  		}
  3755  		// take into account the NetworkProgrammingLatency
  3756  		// testEndpointReachability tries 3 times every 3 second
  3757  		// we retry again during 30 seconds to check if the port stops forwarding
  3758  		gomega.Eventually(ctx, func() error {
  3759  			return testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolTCP, execPod, 6*time.Second)
  3760  		}).WithTimeout(30 * time.Second).WithPolling(5 * time.Second).ShouldNot(gomega.BeNil())
  3761  	})
  3762  
  3763  	/*
  3764  		Release: v1.26
  3765  		Testname: Service, same ports with different protocols on a Load Balancer Service
  3766  		Description: Create a LoadBalancer service with two ports that have the same value but use different protocols. Add a Pod that listens on both ports. The Pod must be reachable via the ClusterIP and both ports
  3767  	*/
  3768  	ginkgo.It("should serve endpoints on same port and different protocol for internal traffic on Type LoadBalancer ", func(ctx context.Context) {
  3769  		serviceName := "multiprotocol-lb-test"
  3770  		ns := f.Namespace.Name
  3771  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  3772  
  3773  		ginkgo.DeferCleanup(func(ctx context.Context) {
  3774  			err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
  3775  			framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
  3776  		})
  3777  
  3778  		svc1port := "svc1"
  3779  		svc2port := "svc2"
  3780  
  3781  		ginkgo.By("creating service " + serviceName + " in namespace " + ns)
  3782  		svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(service *v1.Service) {
  3783  			service.Spec.Ports = []v1.ServicePort{
  3784  				{
  3785  					Name:       "portname1",
  3786  					Port:       80,
  3787  					TargetPort: intstr.FromString(svc1port),
  3788  					Protocol:   v1.ProtocolTCP,
  3789  				},
  3790  				{
  3791  					Name:       "portname2",
  3792  					Port:       80,
  3793  					TargetPort: intstr.FromString(svc2port),
  3794  					Protocol:   v1.ProtocolUDP,
  3795  				},
  3796  			}
  3797  		})
  3798  		framework.ExpectNoError(err)
  3799  
  3800  		containerPort := 100
  3801  
  3802  		names := map[string]bool{}
  3803  		ginkgo.DeferCleanup(func(ctx context.Context) {
  3804  			for name := range names {
  3805  				err := cs.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
  3806  				framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", name, ns)
  3807  			}
  3808  		})
  3809  
  3810  		containerPorts := []v1.ContainerPort{
  3811  			{
  3812  				Name:          svc1port,
  3813  				ContainerPort: int32(containerPort),
  3814  				Protocol:      v1.ProtocolTCP,
  3815  			},
  3816  			{
  3817  				Name:          svc2port,
  3818  				ContainerPort: int32(containerPort),
  3819  				Protocol:      v1.ProtocolUDP,
  3820  			},
  3821  		}
  3822  
  3823  		podname1 := "pod1"
  3824  
  3825  		createPodOrFail(ctx, f, ns, podname1, jig.Labels, containerPorts, "netexec", "--http-port", strconv.Itoa(containerPort), "--udp-port", strconv.Itoa(containerPort))
  3826  		validateEndpointsPortsWithProtocolsOrFail(cs, ns, serviceName, fullPortsByPodName{podname1: containerPorts})
  3827  
  3828  		ginkgo.By("Checking if the Service forwards traffic to pods")
  3829  		execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
  3830  		err = jig.CheckServiceReachability(ctx, svc, execPod)
  3831  		framework.ExpectNoError(err)
  3832  		e2epod.DeletePodOrFail(ctx, cs, ns, podname1)
  3833  	})
  3834  
  3835  	// These is [Serial] because it can't run at the same time as the
  3836  	// [Feature:SCTPConnectivity] tests, since they may cause sctp.ko to be loaded.
  3837  	f.It("should allow creating a basic SCTP service with pod and endpoints [LinuxOnly]", f.WithSerial(), func(ctx context.Context) {
  3838  		serviceName := "sctp-endpoint-test"
  3839  		ns := f.Namespace.Name
  3840  		jig := e2eservice.NewTestJig(cs, ns, serviceName)
  3841  
  3842  		ginkgo.By("getting the state of the sctp module on nodes")
  3843  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
  3844  		framework.ExpectNoError(err)
  3845  		sctpLoadedAtStart := CheckSCTPModuleLoadedOnNodes(ctx, f, nodes)
  3846  
  3847  		ginkgo.By("creating service " + serviceName + " in namespace " + ns)
  3848  		_, err = jig.CreateSCTPServiceWithPort(ctx, nil, 5060)
  3849  		framework.ExpectNoError(err)
  3850  		ginkgo.DeferCleanup(func(ctx context.Context) {
  3851  			err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
  3852  			framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
  3853  		})
  3854  
  3855  		err = e2enetwork.WaitForService(ctx, f.ClientSet, ns, serviceName, true, 5*time.Second, e2eservice.TestTimeout)
  3856  		framework.ExpectNoError(err, fmt.Sprintf("error while waiting for service:%s err: %v", serviceName, err))
  3857  
  3858  		ginkgo.By("validating endpoints do not exist yet")
  3859  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
  3860  
  3861  		ginkgo.By("creating a pod for the service")
  3862  		names := map[string]bool{}
  3863  
  3864  		name1 := "pod1"
  3865  
  3866  		createPodOrFail(ctx, f, ns, name1, jig.Labels, []v1.ContainerPort{{ContainerPort: 5060, Protocol: v1.ProtocolSCTP}})
  3867  		names[name1] = true
  3868  		ginkgo.DeferCleanup(func(ctx context.Context) {
  3869  			for name := range names {
  3870  				err := cs.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
  3871  				framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", name, ns)
  3872  			}
  3873  		})
  3874  
  3875  		ginkgo.By("validating endpoints exists")
  3876  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{name1: {5060}})
  3877  
  3878  		ginkgo.By("deleting the pod")
  3879  		e2epod.DeletePodOrFail(ctx, cs, ns, name1)
  3880  		delete(names, name1)
  3881  		ginkgo.By("validating endpoints do not exist anymore")
  3882  		validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
  3883  
  3884  		ginkgo.By("validating sctp module is still not loaded")
  3885  		sctpLoadedAtEnd := CheckSCTPModuleLoadedOnNodes(ctx, f, nodes)
  3886  		if !sctpLoadedAtStart && sctpLoadedAtEnd {
  3887  			framework.Failf("The state of the sctp module has changed due to the test case")
  3888  		}
  3889  	})
  3890  })
  3891  
  3892  // execAffinityTestForSessionAffinityTimeout is a helper function that wrap the logic of
  3893  // affinity test for non-load-balancer services. Session affinity will be
  3894  // enabled when the service is created and a short timeout will be configured so
  3895  // session affinity must change after the timeout expirese.
  3896  func execAffinityTestForSessionAffinityTimeout(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
  3897  	ns := f.Namespace.Name
  3898  	numPods, servicePort, serviceName := 3, defaultServeHostnameServicePort, svc.ObjectMeta.Name
  3899  	ginkgo.By("creating service in namespace " + ns)
  3900  	serviceType := svc.Spec.Type
  3901  	// set an affinity timeout equal to the number of connection requests
  3902  	svcSessionAffinityTimeout := int32(SessionAffinityTimeout)
  3903  	svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
  3904  	svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{
  3905  		ClientIP: &v1.ClientIPConfig{TimeoutSeconds: &svcSessionAffinityTimeout},
  3906  	}
  3907  	_, _, err := StartServeHostnameService(ctx, cs, svc, ns, numPods)
  3908  	framework.ExpectNoError(err, "failed to create replication controller with service in the namespace: %s", ns)
  3909  	ginkgo.DeferCleanup(StopServeHostnameService, cs, ns, serviceName)
  3910  	jig := e2eservice.NewTestJig(cs, ns, serviceName)
  3911  	svc, err = jig.Client.CoreV1().Services(ns).Get(ctx, serviceName, metav1.GetOptions{})
  3912  	framework.ExpectNoError(err, "failed to fetch service: %s in namespace: %s", serviceName, ns)
  3913  	var svcIP string
  3914  	if serviceType == v1.ServiceTypeNodePort {
  3915  		nodes, err := e2enode.GetReadySchedulableNodes(ctx, cs)
  3916  		framework.ExpectNoError(err)
  3917  		// The node addresses must have the same IP family as the ClusterIP
  3918  		family := v1.IPv4Protocol
  3919  		if netutils.IsIPv6String(svc.Spec.ClusterIP) {
  3920  			family = v1.IPv6Protocol
  3921  		}
  3922  		svcIP = e2enode.FirstAddressByTypeAndFamily(nodes, v1.NodeInternalIP, family)
  3923  		gomega.Expect(svcIP).NotTo(gomega.BeEmpty(), "failed to get Node internal IP for family: %s", family)
  3924  		servicePort = int(svc.Spec.Ports[0].NodePort)
  3925  	} else {
  3926  		svcIP = svc.Spec.ClusterIP
  3927  	}
  3928  
  3929  	execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod-affinity", nil)
  3930  	ginkgo.DeferCleanup(func(ctx context.Context) {
  3931  		framework.Logf("Cleaning up the exec pod")
  3932  		err := cs.CoreV1().Pods(ns).Delete(ctx, execPod.Name, metav1.DeleteOptions{})
  3933  		framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", execPod.Name, ns)
  3934  	})
  3935  	err = jig.CheckServiceReachability(ctx, svc, execPod)
  3936  	framework.ExpectNoError(err)
  3937  
  3938  	// the service should be sticky until the timeout expires
  3939  	if !checkAffinity(ctx, cs, execPod, svcIP, servicePort, true) {
  3940  		framework.Failf("the service %s (%s:%d) should be sticky until the timeout expires", svc.Name, svcIP, servicePort)
  3941  	}
  3942  	// but it should return different hostnames after the timeout expires
  3943  	// try several times to avoid the probability that we hit the same pod twice
  3944  	hosts := sets.NewString()
  3945  	cmd := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, net.JoinHostPort(svcIP, strconv.Itoa(servicePort)))
  3946  	for i := 0; i < 10; i++ {
  3947  		hostname, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
  3948  		if err == nil {
  3949  			hosts.Insert(hostname)
  3950  			if hosts.Len() > 1 {
  3951  				return
  3952  			}
  3953  			// In some case, ipvs didn't deleted the persistent connection after timeout expired,
  3954  			// use 'ipvsadm -lnc' command can found the expire time become '13171233:02' after '00:00'
  3955  			//
  3956  			// pro expire state       source             virtual            destination
  3957  			// TCP 00:00  NONE        10.105.253.160:0   10.105.253.160:80  10.244.1.25:9376
  3958  			//
  3959  			// pro expire state       source             virtual            destination
  3960  			// TCP 13171233:02 NONE        10.105.253.160:0   10.105.253.160:80  10.244.1.25:9376
  3961  			//
  3962  			// And 2 seconds later, the connection will be ensure deleted,
  3963  			// so we sleep 'svcSessionAffinityTimeout+5' seconds to avoid this issue.
  3964  			// TODO: figure out why the expired connection didn't be deleted and fix this issue in ipvs side.
  3965  			time.Sleep(time.Duration(svcSessionAffinityTimeout+5) * time.Second)
  3966  		}
  3967  	}
  3968  	framework.Fail("Session is sticky after reaching the timeout")
  3969  }
  3970  
  3971  func execAffinityTestForNonLBServiceWithTransition(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
  3972  	execAffinityTestForNonLBServiceWithOptionalTransition(ctx, f, cs, svc, true)
  3973  }
  3974  
  3975  func execAffinityTestForNonLBService(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
  3976  	execAffinityTestForNonLBServiceWithOptionalTransition(ctx, f, cs, svc, false)
  3977  }
  3978  
  3979  // execAffinityTestForNonLBServiceWithOptionalTransition is a helper function that wrap the logic of
  3980  // affinity test for non-load-balancer services. Session affinity will be
  3981  // enabled when the service is created. If parameter isTransitionTest is true,
  3982  // session affinity will be switched off/on and test if the service converges
  3983  // to a stable affinity state.
  3984  func execAffinityTestForNonLBServiceWithOptionalTransition(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service, isTransitionTest bool) {
  3985  	ns := f.Namespace.Name
  3986  	numPods, servicePort, serviceName := 3, defaultServeHostnameServicePort, svc.ObjectMeta.Name
  3987  	ginkgo.By("creating service in namespace " + ns)
  3988  	serviceType := svc.Spec.Type
  3989  	svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
  3990  	_, _, err := StartServeHostnameService(ctx, cs, svc, ns, numPods)
  3991  	framework.ExpectNoError(err, "failed to create replication controller with service in the namespace: %s", ns)
  3992  	ginkgo.DeferCleanup(StopServeHostnameService, cs, ns, serviceName)
  3993  	jig := e2eservice.NewTestJig(cs, ns, serviceName)
  3994  	svc, err = jig.Client.CoreV1().Services(ns).Get(ctx, serviceName, metav1.GetOptions{})
  3995  	framework.ExpectNoError(err, "failed to fetch service: %s in namespace: %s", serviceName, ns)
  3996  	var svcIP string
  3997  	if serviceType == v1.ServiceTypeNodePort {
  3998  		nodes, err := e2enode.GetReadySchedulableNodes(ctx, cs)
  3999  		framework.ExpectNoError(err)
  4000  		// The node addresses must have the same IP family as the ClusterIP
  4001  		family := v1.IPv4Protocol
  4002  		if netutils.IsIPv6String(svc.Spec.ClusterIP) {
  4003  			family = v1.IPv6Protocol
  4004  		}
  4005  		svcIP = e2enode.FirstAddressByTypeAndFamily(nodes, v1.NodeInternalIP, family)
  4006  		gomega.Expect(svcIP).NotTo(gomega.BeEmpty(), "failed to get Node internal IP for family: %s", family)
  4007  		servicePort = int(svc.Spec.Ports[0].NodePort)
  4008  	} else {
  4009  		svcIP = svc.Spec.ClusterIP
  4010  	}
  4011  
  4012  	execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod-affinity", nil)
  4013  	ginkgo.DeferCleanup(func(ctx context.Context) {
  4014  		framework.Logf("Cleaning up the exec pod")
  4015  		err := cs.CoreV1().Pods(ns).Delete(ctx, execPod.Name, metav1.DeleteOptions{})
  4016  		framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", execPod.Name, ns)
  4017  	})
  4018  	err = jig.CheckServiceReachability(ctx, svc, execPod)
  4019  	framework.ExpectNoError(err)
  4020  
  4021  	if !isTransitionTest {
  4022  		if !checkAffinity(ctx, cs, execPod, svcIP, servicePort, true) {
  4023  			framework.Failf("Failed to check affinity for service %s/%s", ns, svc.Name)
  4024  		}
  4025  	}
  4026  	if isTransitionTest {
  4027  		_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
  4028  			svc.Spec.SessionAffinity = v1.ServiceAffinityNone
  4029  		})
  4030  		framework.ExpectNoError(err)
  4031  		if !checkAffinity(ctx, cs, execPod, svcIP, servicePort, false) {
  4032  			framework.Failf("Failed to check affinity for service %s/%s without session affinity", ns, svc.Name)
  4033  		}
  4034  		_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
  4035  			svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
  4036  		})
  4037  		framework.ExpectNoError(err)
  4038  		if !checkAffinity(ctx, cs, execPod, svcIP, servicePort, true) {
  4039  			framework.Failf("Failed to check affinity for service %s/%s with session affinity", ns, svc.Name)
  4040  		}
  4041  	}
  4042  }
  4043  
  4044  func execAffinityTestForLBServiceWithTransition(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
  4045  	execAffinityTestForLBServiceWithOptionalTransition(ctx, f, cs, svc, true)
  4046  }
  4047  
  4048  func execAffinityTestForLBService(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
  4049  	execAffinityTestForLBServiceWithOptionalTransition(ctx, f, cs, svc, false)
  4050  }
  4051  
  4052  // execAffinityTestForLBServiceWithOptionalTransition is a helper function that wrap the logic of
  4053  // affinity test for load balancer services, similar to
  4054  // execAffinityTestForNonLBServiceWithOptionalTransition.
  4055  func execAffinityTestForLBServiceWithOptionalTransition(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service, isTransitionTest bool) {
  4056  	numPods, ns, serviceName := 3, f.Namespace.Name, svc.ObjectMeta.Name
  4057  
  4058  	ginkgo.By("creating service in namespace " + ns)
  4059  	svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
  4060  	_, _, err := StartServeHostnameService(ctx, cs, svc, ns, numPods)
  4061  	framework.ExpectNoError(err, "failed to create replication controller with service in the namespace: %s", ns)
  4062  	jig := e2eservice.NewTestJig(cs, ns, serviceName)
  4063  	ginkgo.By("waiting for loadbalancer for service " + ns + "/" + serviceName)
  4064  	svc, err = jig.WaitForLoadBalancer(ctx, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs))
  4065  	framework.ExpectNoError(err)
  4066  	ginkgo.DeferCleanup(func(ctx context.Context) {
  4067  		podNodePairs, err := e2enode.PodNodePairs(ctx, cs, ns)
  4068  		framework.Logf("[pod,node] pairs: %+v; err: %v", podNodePairs, err)
  4069  		_ = StopServeHostnameService(ctx, cs, ns, serviceName)
  4070  		lb := cloudprovider.DefaultLoadBalancerName(svc)
  4071  		framework.Logf("cleaning load balancer resource for %s", lb)
  4072  		e2eservice.CleanupServiceResources(ctx, cs, lb, framework.TestContext.CloudConfig.Region, framework.TestContext.CloudConfig.Zone)
  4073  	})
  4074  	ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
  4075  	port := int(svc.Spec.Ports[0].Port)
  4076  
  4077  	if !isTransitionTest {
  4078  		if !checkAffinity(ctx, cs, nil, ingressIP, port, true) {
  4079  			framework.Failf("Failed to verify affinity for loadbalance service %s/%s", ns, serviceName)
  4080  		}
  4081  	}
  4082  	if isTransitionTest {
  4083  		svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
  4084  			svc.Spec.SessionAffinity = v1.ServiceAffinityNone
  4085  		})
  4086  		framework.ExpectNoError(err)
  4087  		if !checkAffinity(ctx, cs, nil, ingressIP, port, false) {
  4088  			framework.Failf("Failed to verify affinity for loadbalance service %s/%s without session affinity ", ns, serviceName)
  4089  		}
  4090  		svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
  4091  			svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
  4092  		})
  4093  		framework.ExpectNoError(err)
  4094  		if !checkAffinity(ctx, cs, nil, ingressIP, port, true) {
  4095  			framework.Failf("Failed to verify affinity for loadbalance service %s/%s with session affinity ", ns, serviceName)
  4096  		}
  4097  	}
  4098  }
  4099  
  4100  func createAndGetExternalServiceFQDN(ctx context.Context, cs clientset.Interface, ns, serviceName string) string {
  4101  	_, _, err := StartServeHostnameService(ctx, cs, getServeHostnameService(serviceName), ns, 2)
  4102  	framework.ExpectNoError(err, "Expected Service %s to be running", serviceName)
  4103  	return fmt.Sprintf("%s.%s.svc.%s", serviceName, ns, framework.TestContext.ClusterDNSDomain)
  4104  }
  4105  
  4106  func createPausePodDeployment(ctx context.Context, cs clientset.Interface, name, ns string, replicas int) *appsv1.Deployment {
  4107  	labels := map[string]string{"deployment": "agnhost-pause"}
  4108  	pauseDeployment := e2edeployment.NewDeployment(name, int32(replicas), labels, "", "", appsv1.RollingUpdateDeploymentStrategyType)
  4109  
  4110  	pauseDeployment.Spec.Template.Spec.Containers[0] = e2epod.NewAgnhostContainer("agnhost-pause", nil, nil, "pause")
  4111  	pauseDeployment.Spec.Template.Spec.Affinity = &v1.Affinity{
  4112  		PodAntiAffinity: &v1.PodAntiAffinity{
  4113  			RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
  4114  				{
  4115  					LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
  4116  					TopologyKey:   "kubernetes.io/hostname",
  4117  					Namespaces:    []string{ns},
  4118  				},
  4119  			},
  4120  		},
  4121  	}
  4122  
  4123  	deployment, err := cs.AppsV1().Deployments(ns).Create(ctx, pauseDeployment, metav1.CreateOptions{})
  4124  	framework.ExpectNoError(err, "Error in creating deployment for pause pod")
  4125  	return deployment
  4126  }
  4127  
  4128  // createPodOrFail creates a pod with the specified containerPorts.
  4129  func createPodOrFail(ctx context.Context, f *framework.Framework, ns, name string, labels map[string]string, containerPorts []v1.ContainerPort, args ...string) {
  4130  	ginkgo.By(fmt.Sprintf("Creating pod %s in namespace %s", name, ns))
  4131  	pod := e2epod.NewAgnhostPod(ns, name, nil, nil, containerPorts, args...)
  4132  	pod.ObjectMeta.Labels = labels
  4133  	// Add a dummy environment variable to work around a docker issue.
  4134  	// https://github.com/docker/docker/issues/14203
  4135  	pod.Spec.Containers[0].Env = []v1.EnvVar{{Name: "FOO", Value: " "}}
  4136  	e2epod.NewPodClient(f).CreateSync(ctx, pod)
  4137  }
  4138  
  4139  // launchHostExecPod launches a hostexec pod in the given namespace and waits
  4140  // until it's Running
  4141  func launchHostExecPod(ctx context.Context, client clientset.Interface, ns, name string) *v1.Pod {
  4142  	framework.Logf("Creating new host exec pod")
  4143  	hostExecPod := e2epod.NewExecPodSpec(ns, name, true)
  4144  	pod, err := client.CoreV1().Pods(ns).Create(ctx, hostExecPod, metav1.CreateOptions{})
  4145  	framework.ExpectNoError(err)
  4146  	err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, client, name, ns, framework.PodStartTimeout)
  4147  	framework.ExpectNoError(err)
  4148  	return pod
  4149  }
  4150  
  4151  // checkReachabilityFromPod checks reachability from the specified pod.
  4152  func checkReachabilityFromPod(expectToBeReachable bool, timeout time.Duration, namespace, pod, target string) {
  4153  	cmd := fmt.Sprintf("wget -T 5 -qO- %q", target)
  4154  	err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
  4155  		_, err := e2eoutput.RunHostCmd(namespace, pod, cmd)
  4156  		if expectToBeReachable && err != nil {
  4157  			framework.Logf("Expect target to be reachable. But got err: %v. Retry until timeout", err)
  4158  			return false, nil
  4159  		}
  4160  
  4161  		if !expectToBeReachable && err == nil {
  4162  			framework.Logf("Expect target NOT to be reachable. But it is reachable. Retry until timeout")
  4163  			return false, nil
  4164  		}
  4165  		return true, nil
  4166  	})
  4167  	framework.ExpectNoError(err)
  4168  }
  4169  
  4170  // enableAndDisableInternalLB returns two functions for enabling and disabling the internal load balancer
  4171  // setting for the supported cloud providers (currently GCE/GKE and Azure) and empty functions for others.
  4172  func enableAndDisableInternalLB() (enable func(svc *v1.Service), disable func(svc *v1.Service)) {
  4173  	return framework.TestContext.CloudConfig.Provider.EnableAndDisableInternalLB()
  4174  }
  4175  
  4176  func validatePorts(ep, expectedEndpoints portsByPodUID) error {
  4177  	if len(ep) != len(expectedEndpoints) {
  4178  		// should not happen because we check this condition before
  4179  		return fmt.Errorf("invalid number of endpoints got %v, expected %v", ep, expectedEndpoints)
  4180  	}
  4181  	for podUID := range expectedEndpoints {
  4182  		if _, ok := ep[podUID]; !ok {
  4183  			return fmt.Errorf("endpoint %v not found", podUID)
  4184  		}
  4185  		if len(ep[podUID]) != len(expectedEndpoints[podUID]) {
  4186  			return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
  4187  		}
  4188  		sort.Ints(ep[podUID])
  4189  		sort.Ints(expectedEndpoints[podUID])
  4190  		for index := range ep[podUID] {
  4191  			if ep[podUID][index] != expectedEndpoints[podUID][index] {
  4192  				return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
  4193  			}
  4194  		}
  4195  	}
  4196  	return nil
  4197  }
  4198  
  4199  func translatePodNameToUID(ctx context.Context, c clientset.Interface, ns string, expectedEndpoints portsByPodName) (portsByPodUID, error) {
  4200  	portsByUID := make(portsByPodUID)
  4201  	for name, portList := range expectedEndpoints {
  4202  		pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
  4203  		if err != nil {
  4204  			return nil, fmt.Errorf("failed to get pod %s, that's pretty weird. validation failed: %w", name, err)
  4205  		}
  4206  		portsByUID[pod.ObjectMeta.UID] = portList
  4207  	}
  4208  	return portsByUID, nil
  4209  }
  4210  
  4211  // validateEndpointsPortsOrFail validates that the given service exists and is served by the given expectedEndpoints.
  4212  func validateEndpointsPortsOrFail(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectedEndpoints portsByPodName) {
  4213  	ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", framework.ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
  4214  	expectedPortsByPodUID, err := translatePodNameToUID(ctx, c, namespace, expectedEndpoints)
  4215  	framework.ExpectNoError(err, "failed to translate pod name to UID, ns:%s, expectedEndpoints:%v", namespace, expectedEndpoints)
  4216  
  4217  	var (
  4218  		pollErr error
  4219  		i       = 0
  4220  	)
  4221  	if pollErr = wait.PollImmediate(time.Second, framework.ServiceStartTimeout, func() (bool, error) {
  4222  		i++
  4223  
  4224  		ep, err := c.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
  4225  		if err != nil {
  4226  			framework.Logf("Failed go get Endpoints object: %v", err)
  4227  			// Retry the error
  4228  			return false, nil
  4229  		}
  4230  		portsByUID := portsByPodUID(e2eendpoints.GetContainerPortsByPodUID(ep))
  4231  		if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil {
  4232  			if i%5 == 0 {
  4233  				framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
  4234  			}
  4235  			return false, nil
  4236  		}
  4237  
  4238  		// If EndpointSlice API is enabled, then validate if appropriate EndpointSlice objects
  4239  		// were also create/updated/deleted.
  4240  		if _, err := c.Discovery().ServerResourcesForGroupVersion(discoveryv1.SchemeGroupVersion.String()); err == nil {
  4241  			opts := metav1.ListOptions{
  4242  				LabelSelector: "kubernetes.io/service-name=" + serviceName,
  4243  			}
  4244  			es, err := c.DiscoveryV1().EndpointSlices(namespace).List(ctx, opts)
  4245  			if err != nil {
  4246  				framework.Logf("Failed go list EndpointSlice objects: %v", err)
  4247  				// Retry the error
  4248  				return false, nil
  4249  			}
  4250  			portsByUID = portsByPodUID(e2eendpointslice.GetContainerPortsByPodUID(es.Items))
  4251  			if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil {
  4252  				if i%5 == 0 {
  4253  					framework.Logf("Unexpected endpoint slices: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
  4254  				}
  4255  				return false, nil
  4256  			}
  4257  		}
  4258  		framework.Logf("successfully validated that service %s in namespace %s exposes endpoints %v",
  4259  			serviceName, namespace, expectedEndpoints)
  4260  		return true, nil
  4261  	}); pollErr != nil {
  4262  		if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err == nil {
  4263  			for _, pod := range pods.Items {
  4264  				framework.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
  4265  			}
  4266  		} else {
  4267  			framework.Logf("Can't list pod debug info: %v", err)
  4268  		}
  4269  	}
  4270  	framework.ExpectNoError(pollErr, "error waithing for service %s in namespace %s to expose endpoints %v: %v", serviceName, namespace, expectedEndpoints)
  4271  }
  4272  
  4273  func restartApiserver(ctx context.Context, namespace string, cs clientset.Interface) error {
  4274  	if framework.ProviderIs("gke") {
  4275  		// GKE use a same-version master upgrade to teardown/recreate master.
  4276  		v, err := cs.Discovery().ServerVersion()
  4277  		if err != nil {
  4278  			return err
  4279  		}
  4280  		return e2eproviders.MasterUpgradeGKE(ctx, namespace, v.GitVersion[1:]) // strip leading 'v'
  4281  	}
  4282  
  4283  	return restartComponent(ctx, cs, kubeAPIServerLabelName, metav1.NamespaceSystem, map[string]string{clusterComponentKey: kubeAPIServerLabelName})
  4284  }
  4285  
  4286  // restartComponent restarts component static pod
  4287  func restartComponent(ctx context.Context, cs clientset.Interface, cName, ns string, matchLabels map[string]string) error {
  4288  	pods, err := e2epod.GetPods(ctx, cs, ns, matchLabels)
  4289  	if err != nil {
  4290  		return fmt.Errorf("failed to get %s's pods, err: %w", cName, err)
  4291  	}
  4292  	if len(pods) == 0 {
  4293  		return fmt.Errorf("%s pod count is 0", cName)
  4294  	}
  4295  
  4296  	if err := e2epod.DeletePodsWithGracePeriod(ctx, cs, pods, 0); err != nil {
  4297  		return fmt.Errorf("failed to restart component: %s, err: %w", cName, err)
  4298  	}
  4299  
  4300  	_, err = e2epod.PodsCreatedByLabel(ctx, cs, ns, cName, int32(len(pods)), labels.SelectorFromSet(matchLabels))
  4301  	return err
  4302  }
  4303  
  4304  // validateEndpointsPortsWithProtocolsOrFail validates that the given service exists and is served by the given expectedEndpoints.
  4305  func validateEndpointsPortsWithProtocolsOrFail(c clientset.Interface, namespace, serviceName string, expectedEndpoints fullPortsByPodName) {
  4306  	ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", framework.ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
  4307  	expectedPortsByPodUID, err := translatePortsByPodNameToPortsByPodUID(c, namespace, expectedEndpoints)
  4308  	framework.ExpectNoError(err, "failed to translate pod name to UID, ns:%s, expectedEndpoints:%v", namespace, expectedEndpoints)
  4309  
  4310  	var (
  4311  		pollErr error
  4312  		i       = 0
  4313  	)
  4314  	if pollErr = wait.PollImmediate(time.Second, framework.ServiceStartTimeout, func() (bool, error) {
  4315  		i++
  4316  
  4317  		ep, err := c.CoreV1().Endpoints(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
  4318  		if err != nil {
  4319  			framework.Logf("Failed go get Endpoints object: %v", err)
  4320  			// Retry the error
  4321  			return false, nil
  4322  		}
  4323  		portsByUID := fullPortsByPodUID(e2eendpoints.GetFullContainerPortsByPodUID(ep))
  4324  		if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil {
  4325  			if i%5 == 0 {
  4326  				framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
  4327  			}
  4328  			return false, nil
  4329  		}
  4330  
  4331  		// If EndpointSlice API is enabled, then validate if appropriate EndpointSlice objects
  4332  		// were also create/updated/deleted.
  4333  		if _, err := c.Discovery().ServerResourcesForGroupVersion(discoveryv1.SchemeGroupVersion.String()); err == nil {
  4334  			opts := metav1.ListOptions{
  4335  				LabelSelector: "kubernetes.io/service-name=" + serviceName,
  4336  			}
  4337  			es, err := c.DiscoveryV1().EndpointSlices(namespace).List(context.TODO(), opts)
  4338  			if err != nil {
  4339  				framework.Logf("Failed go list EndpointSlice objects: %v", err)
  4340  				// Retry the error
  4341  				return false, nil
  4342  			}
  4343  			portsByUID = fullPortsByPodUID(e2eendpointslice.GetFullContainerPortsByPodUID(es.Items))
  4344  			if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil {
  4345  				if i%5 == 0 {
  4346  					framework.Logf("Unexpected endpoint slices: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
  4347  				}
  4348  				return false, nil
  4349  			}
  4350  		}
  4351  		framework.Logf("successfully validated that service %s in namespace %s exposes endpoints %v",
  4352  			serviceName, namespace, expectedEndpoints)
  4353  		return true, nil
  4354  	}); pollErr != nil {
  4355  		if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{}); err == nil {
  4356  			for _, pod := range pods.Items {
  4357  				framework.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
  4358  			}
  4359  		} else {
  4360  			framework.Logf("Can't list pod debug info: %v", err)
  4361  		}
  4362  	}
  4363  	framework.ExpectNoError(pollErr, "error waithing for service %s in namespace %s to expose endpoints %v: %v", serviceName, namespace, expectedEndpoints)
  4364  }
  4365  
  4366  func translatePortsByPodNameToPortsByPodUID(c clientset.Interface, ns string, expectedEndpoints fullPortsByPodName) (fullPortsByPodUID, error) {
  4367  	portsByUID := make(fullPortsByPodUID)
  4368  	for name, portList := range expectedEndpoints {
  4369  		pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{})
  4370  		if err != nil {
  4371  			return nil, fmt.Errorf("failed to get pod %s, that's pretty weird. validation failed: %w", name, err)
  4372  		}
  4373  		portsByUID[pod.ObjectMeta.UID] = portList
  4374  	}
  4375  	return portsByUID, nil
  4376  }
  4377  
  4378  func validatePortsAndProtocols(ep, expectedEndpoints fullPortsByPodUID) error {
  4379  	if len(ep) != len(expectedEndpoints) {
  4380  		// should not happen because we check this condition before
  4381  		return fmt.Errorf("invalid number of endpoints got %v, expected %v", ep, expectedEndpoints)
  4382  	}
  4383  	for podUID := range expectedEndpoints {
  4384  		if _, ok := ep[podUID]; !ok {
  4385  			return fmt.Errorf("endpoint %v not found", podUID)
  4386  		}
  4387  		if len(ep[podUID]) != len(expectedEndpoints[podUID]) {
  4388  			return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
  4389  		}
  4390  		var match bool
  4391  		for _, epPort := range ep[podUID] {
  4392  			match = false
  4393  			for _, expectedPort := range expectedEndpoints[podUID] {
  4394  				if epPort.ContainerPort == expectedPort.ContainerPort && epPort.Protocol == expectedPort.Protocol {
  4395  					match = true
  4396  				}
  4397  			}
  4398  			if !match {
  4399  				return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
  4400  			}
  4401  		}
  4402  	}
  4403  	return nil
  4404  }
  4405  

View as plain text