...

Source file src/k8s.io/kubernetes/test/e2e/framework/network/utils.go

Documentation: k8s.io/kubernetes/test/e2e/framework/network

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package network
    18  
    19  import (
    20  	"context"
    21  	"crypto/tls"
    22  	"encoding/json"
    23  	"fmt"
    24  	"io"
    25  	"net"
    26  	"net/http"
    27  	"strconv"
    28  	"strings"
    29  	"time"
    30  
    31  	"github.com/onsi/ginkgo/v2"
    32  	v1 "k8s.io/api/core/v1"
    33  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/util/intstr"
    36  	utilnet "k8s.io/apimachinery/pkg/util/net"
    37  	"k8s.io/apimachinery/pkg/util/sets"
    38  	"k8s.io/apimachinery/pkg/util/uuid"
    39  	"k8s.io/apimachinery/pkg/util/wait"
    40  	clientset "k8s.io/client-go/kubernetes"
    41  	coreclientset "k8s.io/client-go/kubernetes/typed/core/v1"
    42  	"k8s.io/kubernetes/test/e2e/framework"
    43  	e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
    44  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    45  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    46  	e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
    47  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    48  	e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
    49  	imageutils "k8s.io/kubernetes/test/utils/image"
    50  	netutils "k8s.io/utils/net"
    51  )
    52  
    53  const (
    54  	// EndpointHTTPPort is an endpoint HTTP port for testing.
    55  	EndpointHTTPPort = 8083
    56  	// EndpointUDPPort is an endpoint UDP port for testing.
    57  	EndpointUDPPort = 8081
    58  	// EndpointSCTPPort is an endpoint SCTP port for testing.
    59  	EndpointSCTPPort = 8082
    60  	// testContainerHTTPPort is the test container http port.
    61  	testContainerHTTPPort = 9080
    62  	// ClusterHTTPPort is a cluster HTTP port for testing.
    63  	ClusterHTTPPort = 80
    64  	// ClusterUDPPort is a cluster UDP port for testing.
    65  	ClusterUDPPort = 90
    66  	// ClusterSCTPPort is a cluster SCTP port for testing.
    67  	ClusterSCTPPort            = 95
    68  	testPodName                = "test-container-pod"
    69  	hostTestPodName            = "host-test-container-pod"
    70  	nodePortServiceName        = "node-port-service"
    71  	sessionAffinityServiceName = "session-affinity-service"
    72  	// wait time between poll attempts of a Service vip and/or nodePort.
    73  	// coupled with testTries to produce a net timeout value.
    74  	hitEndpointRetryDelay = 2 * time.Second
    75  	// Number of retries to hit a given set of endpoints. Needs to be high
    76  	// because we verify iptables statistical rr loadbalancing.
    77  	testTries = 30
    78  	// Maximum number of pods in a test, to make test work in large clusters.
    79  	maxNetProxyPodsCount = 10
    80  	// SessionAffinityChecks is number of checks to hit a given set of endpoints when enable session affinity.
    81  	SessionAffinityChecks = 10
    82  	// RegexIPv4 is a regex to match IPv4 addresses
    83  	RegexIPv4 = "(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)"
    84  	// RegexIPv6 is a regex to match IPv6 addresses
    85  	RegexIPv6                 = "(?:(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){6})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:::(?:(?:(?:[0-9a-fA-F]{1,4})):){5})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){4})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,1}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){3})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,2}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){2})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,3}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:[0-9a-fA-F]{1,4})):)(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,4}(?:(?:[0-9a-fA-F]{1,4})))?::)(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,5}(?:(?:[0-9a-fA-F]{1,4})))?::)(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,6}(?:(?:[0-9a-fA-F]{1,4})))?::))))"
    86  	resizeNodeReadyTimeout    = 2 * time.Minute
    87  	resizeNodeNotReadyTimeout = 2 * time.Minute
    88  	// netexec dial commands
    89  	// the destination will echo its hostname.
    90  	echoHostname = "hostname"
    91  )
    92  
    93  // NetexecImageName is the image name for agnhost.
    94  var NetexecImageName = imageutils.GetE2EImage(imageutils.Agnhost)
    95  
    96  // Option is used to configure the NetworkingTest object
    97  type Option func(*NetworkingTestConfig)
    98  
    99  // EnableSCTP listen on SCTP ports on the endpoints
   100  func EnableSCTP(config *NetworkingTestConfig) {
   101  	config.SCTPEnabled = true
   102  }
   103  
   104  // EnableDualStack create Dual Stack services
   105  func EnableDualStack(config *NetworkingTestConfig) {
   106  	config.DualStackEnabled = true
   107  }
   108  
   109  // UseHostNetwork run the test container with HostNetwork=true.
   110  func UseHostNetwork(config *NetworkingTestConfig) {
   111  	config.HostNetwork = true
   112  }
   113  
   114  // EndpointsUseHostNetwork run the endpoints pods with HostNetwork=true.
   115  func EndpointsUseHostNetwork(config *NetworkingTestConfig) {
   116  	config.EndpointsHostNetwork = true
   117  }
   118  
   119  // PreferExternalAddresses prefer node External Addresses for the tests
   120  func PreferExternalAddresses(config *NetworkingTestConfig) {
   121  	config.PreferExternalAddresses = true
   122  }
   123  
   124  // NewNetworkingTestConfig creates and sets up a new test config helper.
   125  func NewNetworkingTestConfig(ctx context.Context, f *framework.Framework, setters ...Option) *NetworkingTestConfig {
   126  	// default options
   127  	config := &NetworkingTestConfig{
   128  		f:         f,
   129  		Namespace: f.Namespace.Name,
   130  	}
   131  	for _, setter := range setters {
   132  		setter(config)
   133  	}
   134  	ginkgo.By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
   135  	config.setup(ctx, getServiceSelector())
   136  	return config
   137  }
   138  
   139  // NewCoreNetworkingTestConfig creates and sets up a new test config helper for Node E2E.
   140  func NewCoreNetworkingTestConfig(ctx context.Context, f *framework.Framework, hostNetwork bool) *NetworkingTestConfig {
   141  	// default options
   142  	config := &NetworkingTestConfig{
   143  		f:           f,
   144  		Namespace:   f.Namespace.Name,
   145  		HostNetwork: hostNetwork,
   146  	}
   147  	ginkgo.By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
   148  	config.setupCore(ctx, getServiceSelector())
   149  	return config
   150  }
   151  
   152  func getServiceSelector() map[string]string {
   153  	ginkgo.By("creating a selector")
   154  	selectorName := "selector-" + string(uuid.NewUUID())
   155  	serviceSelector := map[string]string{
   156  		selectorName: "true",
   157  	}
   158  	return serviceSelector
   159  }
   160  
   161  // NetworkingTestConfig is a convenience class around some utility methods
   162  // for testing kubeproxy/networking/services/endpoints.
   163  type NetworkingTestConfig struct {
   164  	// TestContainerPod is a test pod running the netexec image. It is capable
   165  	// of executing tcp/udp requests against ip:port.
   166  	TestContainerPod *v1.Pod
   167  	// HostTestContainerPod is a pod running using the hostexec image.
   168  	HostTestContainerPod *v1.Pod
   169  	// if the HostTestContainerPod is running with HostNetwork=true.
   170  	HostNetwork bool
   171  	// if the endpoints Pods are running with HostNetwork=true.
   172  	EndpointsHostNetwork bool
   173  	// if the test pods are listening on sctp port. We need this as sctp tests
   174  	// are marked as disruptive as they may load the sctp module.
   175  	SCTPEnabled bool
   176  	// DualStackEnabled enables dual stack on services
   177  	DualStackEnabled bool
   178  	// EndpointPods are the pods belonging to the Service created by this
   179  	// test config. Each invocation of `setup` creates a service with
   180  	// 1 pod per node running the netexecImage.
   181  	EndpointPods []*v1.Pod
   182  	f            *framework.Framework
   183  	podClient    *e2epod.PodClient
   184  	// NodePortService is a Service with Type=NodePort spanning over all
   185  	// endpointPods.
   186  	NodePortService *v1.Service
   187  	// SessionAffinityService is a Service with SessionAffinity=ClientIP
   188  	// spanning over all endpointPods.
   189  	SessionAffinityService *v1.Service
   190  	// Nodes is a list of nodes in the cluster.
   191  	Nodes []v1.Node
   192  	// MaxTries is the number of retries tolerated for tests run against
   193  	// endpoints and services created by this config.
   194  	MaxTries int
   195  	// The ClusterIP of the Service created by this test config.
   196  	ClusterIP string
   197  	// The SecondaryClusterIP of the Service created by this test config.
   198  	SecondaryClusterIP string
   199  	// NodeIP it's an ExternalIP if the node has one,
   200  	// or an InternalIP if not, for use in nodePort testing.
   201  	NodeIP string
   202  	// SecondaryNodeIP it's an ExternalIP of the secondary IP family if the node has one,
   203  	// or an InternalIP if not, for usein nodePort testing.
   204  	SecondaryNodeIP string
   205  	// The http/udp/sctp nodePorts of the Service.
   206  	NodeHTTPPort int
   207  	NodeUDPPort  int
   208  	NodeSCTPPort int
   209  	// The kubernetes namespace within which all resources for this
   210  	// config are created
   211  	Namespace string
   212  	// Whether to prefer node External Addresses for the tests
   213  	PreferExternalAddresses bool
   214  }
   215  
   216  // NetexecDialResponse represents the response returned by the `netexec` subcommand of `agnhost`
   217  type NetexecDialResponse struct {
   218  	Responses []string `json:"responses"`
   219  	Errors    []string `json:"errors"`
   220  }
   221  
   222  // DialFromEndpointContainer executes a curl via kubectl exec in an endpoint container.   Returns an error to be handled by the caller.
   223  func (config *NetworkingTestConfig) DialFromEndpointContainer(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) error {
   224  	return config.DialFromContainer(ctx, protocol, echoHostname, config.EndpointPods[0].Status.PodIP, targetIP, EndpointHTTPPort, targetPort, maxTries, minTries, expectedEps)
   225  }
   226  
   227  // DialFromTestContainer executes a curl via kubectl exec in a test container. Returns an error to be handled by the caller.
   228  func (config *NetworkingTestConfig) DialFromTestContainer(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) error {
   229  	return config.DialFromContainer(ctx, protocol, echoHostname, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, maxTries, minTries, expectedEps)
   230  }
   231  
   232  // DialEchoFromTestContainer executes a curl via kubectl exec in a test container. The response is expected to match the echoMessage,  Returns an error to be handled by the caller.
   233  func (config *NetworkingTestConfig) DialEchoFromTestContainer(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, echoMessage string) error {
   234  	expectedResponse := sets.NewString()
   235  	expectedResponse.Insert(echoMessage)
   236  	var dialCommand string
   237  
   238  	// NOTE(claudiub): netexec /dialCommand will send a request to the given targetIP and targetPort as follows:
   239  	// for HTTP: it will send a request to: http://targetIP:targetPort/dialCommand
   240  	// for UDP: it will send targetCommand as a message. The consumer receives the data message and looks for
   241  	// a few starting strings, including echo, and treats it accordingly.
   242  	if protocol == "http" {
   243  		dialCommand = fmt.Sprintf("echo?msg=%s", echoMessage)
   244  	} else {
   245  		dialCommand = fmt.Sprintf("echo%%20%s", echoMessage)
   246  	}
   247  	return config.DialFromContainer(ctx, protocol, dialCommand, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, maxTries, minTries, expectedResponse)
   248  }
   249  
   250  // diagnoseMissingEndpoints prints debug information about the endpoints that
   251  // are NOT in the given list of foundEndpoints. These are the endpoints we
   252  // expected a response from.
   253  func (config *NetworkingTestConfig) diagnoseMissingEndpoints(foundEndpoints sets.String) {
   254  	for _, e := range config.EndpointPods {
   255  		if foundEndpoints.Has(e.Name) {
   256  			continue
   257  		}
   258  		framework.Logf("\nOutput of kubectl describe pod %v/%v:\n", e.Namespace, e.Name)
   259  		desc, _ := e2ekubectl.RunKubectl(
   260  			e.Namespace, "describe", "pod", e.Name, fmt.Sprintf("--namespace=%v", e.Namespace))
   261  		framework.Logf(desc)
   262  	}
   263  }
   264  
   265  // EndpointHostnames returns a set of hostnames for existing endpoints.
   266  func (config *NetworkingTestConfig) EndpointHostnames() sets.String {
   267  	expectedEps := sets.NewString()
   268  	for _, p := range config.EndpointPods {
   269  		if config.EndpointsHostNetwork {
   270  			expectedEps.Insert(p.Spec.NodeSelector["kubernetes.io/hostname"])
   271  		} else {
   272  			expectedEps.Insert(p.Name)
   273  		}
   274  	}
   275  	return expectedEps
   276  }
   277  
   278  func makeCURLDialCommand(ipPort, dialCmd, protocol, targetIP string, targetPort int) string {
   279  	// The current versions of curl included in CentOS and RHEL distros
   280  	// misinterpret square brackets around IPv6 as globbing, so use the -g
   281  	// argument to disable globbing to handle the IPv6 case.
   282  	return fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=%s&protocol=%s&host=%s&port=%d&tries=1'",
   283  		ipPort,
   284  		dialCmd,
   285  		protocol,
   286  		targetIP,
   287  		targetPort)
   288  }
   289  
   290  // DialFromContainer executes a curl via kubectl exec in a test container,
   291  // which might then translate to a tcp or udp request based on the protocol
   292  // argument in the url.
   293  //   - minTries is the minimum number of curl attempts required before declaring
   294  //     success. Set to 0 if you'd like to return as soon as all endpoints respond
   295  //     at least once.
   296  //   - maxTries is the maximum number of curl attempts. If this many attempts pass
   297  //     and we don't see all expected endpoints, the test fails.
   298  //   - targetIP is the source Pod IP that will dial the given dialCommand using the given protocol.
   299  //   - dialCommand is the command that the targetIP will send to the targetIP using the given protocol.
   300  //     the dialCommand should be formatted properly for the protocol (http: URL path+parameters,
   301  //     udp: command%20parameters, where parameters are optional)
   302  //   - expectedResponses is the unordered set of responses to wait for. The responses are based on
   303  //     the dialCommand; for example, for the dialCommand "hostname", the expectedResponses
   304  //     should contain the hostnames reported by each pod in the service through /hostName.
   305  //
   306  // maxTries == minTries will confirm that we see the expected endpoints and no
   307  // more for maxTries. Use this if you want to eg: fail a readiness check on a
   308  // pod and confirm it doesn't show up as an endpoint.
   309  // Returns nil if no error, or error message if failed after trying maxTries.
   310  func (config *NetworkingTestConfig) DialFromContainer(ctx context.Context, protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort, maxTries, minTries int, expectedResponses sets.String) error {
   311  	ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
   312  	cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort)
   313  
   314  	responses := sets.NewString()
   315  
   316  	for i := 0; i < maxTries; i++ {
   317  		resp, err := config.GetResponseFromContainer(ctx, protocol, dialCommand, containerIP, targetIP, containerHTTPPort, targetPort)
   318  		if err != nil {
   319  			// A failure to kubectl exec counts as a try, not a hard fail.
   320  			// Also note that we will keep failing for maxTries in tests where
   321  			// we confirm unreachability.
   322  			framework.Logf("GetResponseFromContainer: %s", err)
   323  			continue
   324  		}
   325  		for _, response := range resp.Responses {
   326  			trimmed := strings.TrimSpace(response)
   327  			if trimmed != "" {
   328  				responses.Insert(trimmed)
   329  			}
   330  		}
   331  		framework.Logf("Waiting for responses: %v", expectedResponses.Difference(responses))
   332  
   333  		// Check against i+1 so we exit if minTries == maxTries.
   334  		if (responses.Equal(expectedResponses) || responses.Len() == 0 && expectedResponses.Len() == 0) && i+1 >= minTries {
   335  			framework.Logf("reached %v after %v/%v tries", targetIP, i, maxTries)
   336  			return nil
   337  		}
   338  		// TODO: get rid of this delay #36281
   339  		time.Sleep(hitEndpointRetryDelay)
   340  	}
   341  	if dialCommand == echoHostname {
   342  		config.diagnoseMissingEndpoints(responses)
   343  	}
   344  	returnMsg := fmt.Errorf("did not find expected responses... \nTries %d\nCommand %v\nretrieved %v\nexpected %v", maxTries, cmd, responses, expectedResponses)
   345  	framework.Logf("encountered error during dial (%v)", returnMsg)
   346  	return returnMsg
   347  
   348  }
   349  
   350  // GetEndpointsFromTestContainer executes a curl via kubectl exec in a test container.
   351  func (config *NetworkingTestConfig) GetEndpointsFromTestContainer(ctx context.Context, protocol, targetIP string, targetPort, tries int) (sets.String, error) {
   352  	return config.GetEndpointsFromContainer(ctx, protocol, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, tries)
   353  }
   354  
   355  // GetEndpointsFromContainer executes a curl via kubectl exec in a test container,
   356  // which might then translate to a tcp or udp request based on the protocol argument
   357  // in the url. It returns all different endpoints from multiple retries.
   358  //   - tries is the number of curl attempts. If this many attempts pass and
   359  //     we don't see any endpoints, the test fails.
   360  func (config *NetworkingTestConfig) GetEndpointsFromContainer(ctx context.Context, protocol, containerIP, targetIP string, containerHTTPPort, targetPort, tries int) (sets.String, error) {
   361  	ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
   362  	cmd := makeCURLDialCommand(ipPort, "hostName", protocol, targetIP, targetPort)
   363  
   364  	eps := sets.NewString()
   365  
   366  	for i := 0; i < tries; i++ {
   367  		stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.TestContainerPod.Name, cmd)
   368  		if err != nil {
   369  			// A failure to kubectl exec counts as a try, not a hard fail.
   370  			// Also note that we will keep failing for maxTries in tests where
   371  			// we confirm unreachability.
   372  			framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr)
   373  		} else {
   374  			podInfo := fmt.Sprintf("name: %v, namespace: %v, hostIp: %v, podIp: %v, conditions: %v", config.TestContainerPod.Name, config.TestContainerPod.Namespace, config.TestContainerPod.Status.HostIP, config.TestContainerPod.Status.PodIP, config.TestContainerPod.Status.Conditions)
   375  			framework.Logf("Tries: %d, in try: %d, stdout: %v, stderr: %v, command run in Pod { %#v }", tries, i, stdout, stderr, podInfo)
   376  
   377  			var output NetexecDialResponse
   378  			if err := json.Unmarshal([]byte(stdout), &output); err != nil {
   379  				framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
   380  					cmd, config.TestContainerPod.Name, stdout, err)
   381  				continue
   382  			}
   383  
   384  			for _, hostName := range output.Responses {
   385  				trimmed := strings.TrimSpace(hostName)
   386  				if trimmed != "" {
   387  					eps.Insert(trimmed)
   388  				}
   389  			}
   390  			// TODO: get rid of this delay #36281
   391  			time.Sleep(hitEndpointRetryDelay)
   392  		}
   393  	}
   394  	return eps, nil
   395  }
   396  
   397  // GetResponseFromContainer executes a curl via kubectl exec in a container.
   398  func (config *NetworkingTestConfig) GetResponseFromContainer(ctx context.Context, protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort int) (NetexecDialResponse, error) {
   399  	ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
   400  	cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort)
   401  
   402  	stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.TestContainerPod.Name, cmd)
   403  	if err != nil {
   404  		return NetexecDialResponse{}, fmt.Errorf("failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr)
   405  	}
   406  
   407  	var output NetexecDialResponse
   408  	if err := json.Unmarshal([]byte(stdout), &output); err != nil {
   409  		return NetexecDialResponse{}, fmt.Errorf("failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
   410  			cmd, config.TestContainerPod.Name, stdout, err)
   411  	}
   412  	return output, nil
   413  }
   414  
   415  // GetResponseFromTestContainer executes a curl via kubectl exec in a test container.
   416  func (config *NetworkingTestConfig) GetResponseFromTestContainer(ctx context.Context, protocol, dialCommand, targetIP string, targetPort int) (NetexecDialResponse, error) {
   417  	return config.GetResponseFromContainer(ctx, protocol, dialCommand, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort)
   418  }
   419  
   420  // GetHTTPCodeFromTestContainer executes a curl via kubectl exec in a test container and returns the status code.
   421  func (config *NetworkingTestConfig) GetHTTPCodeFromTestContainer(ctx context.Context, path, targetIP string, targetPort int) (int, error) {
   422  	cmd := fmt.Sprintf("curl -g -q -s -o /dev/null -w %%{http_code} http://%s:%d%s",
   423  		targetIP,
   424  		targetPort,
   425  		path)
   426  	stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.TestContainerPod.Name, cmd)
   427  	// We only care about the status code reported by curl,
   428  	// and want to return any other errors, such as cannot execute command in the Pod.
   429  	// If curl failed to connect to host, it would exit with code 7, which makes `ExecShellInPodWithFullOutput`
   430  	// return a non-nil error and output "000" to stdout.
   431  	if err != nil && len(stdout) == 0 {
   432  		return 0, fmt.Errorf("failed to execute %q: %v, stderr: %q", cmd, err, stderr)
   433  	}
   434  	code, err := strconv.Atoi(stdout)
   435  	if err != nil {
   436  		return 0, fmt.Errorf("failed to parse status code returned by healthz endpoint: %w, code: %s", err, stdout)
   437  	}
   438  	return code, nil
   439  }
   440  
   441  // DialFromNode executes a tcp/udp curl/nc request based on protocol via kubectl exec
   442  // in a test container running with host networking.
   443  //   - minTries is the minimum number of curl/nc attempts required before declaring
   444  //     success. If 0, then we return as soon as all endpoints succeed.
   445  //   - There is no logical change to test results if faillures happen AFTER endpoints have succeeded,
   446  //     hence over-padding minTries will NOT reverse a successful result and is thus not very useful yet
   447  //     (See the TODO about checking probability, which isn't implemented yet).
   448  //   - maxTries is the maximum number of curl/echo attempts before an error is returned.  The
   449  //     smaller this number is, the less 'slack' there is for declaring success.
   450  //   - if maxTries < expectedEps, this test is guaranteed to return an error, because all endpoints won't be hit.
   451  //   - maxTries == minTries will return as soon as all endpoints succeed (or fail once maxTries is reached without
   452  //     success on all endpoints).
   453  //     In general its prudent to have a high enough level of minTries to guarantee that all pods get a fair chance at receiving traffic.
   454  func (config *NetworkingTestConfig) DialFromNode(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) error {
   455  	var cmd string
   456  	if protocol == "udp" {
   457  		cmd = fmt.Sprintf("echo hostName | nc -w 1 -u %s %d", targetIP, targetPort)
   458  	} else {
   459  		ipPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort))
   460  		// The current versions of curl included in CentOS and RHEL distros
   461  		// misinterpret square brackets around IPv6 as globbing, so use the -g
   462  		// argument to disable globbing to handle the IPv6 case.
   463  		cmd = fmt.Sprintf("curl -g -q -s --max-time 15 --connect-timeout 1 http://%s/hostName", ipPort)
   464  	}
   465  
   466  	// TODO: This simply tells us that we can reach the endpoints. Check that
   467  	// the probability of hitting a specific endpoint is roughly the same as
   468  	// hitting any other.
   469  	eps := sets.NewString()
   470  
   471  	filterCmd := fmt.Sprintf("%s | grep -v '^\\s*$'", cmd)
   472  	framework.Logf("Going to poll %v on port %v at least %v times, with a maximum of %v tries before failing", targetIP, targetPort, minTries, maxTries)
   473  	for i := 0; i < maxTries; i++ {
   474  		stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.HostTestContainerPod.Name, filterCmd)
   475  		if err != nil || len(stderr) > 0 {
   476  			// A failure to exec command counts as a try, not a hard fail.
   477  			// Also note that we will keep failing for maxTries in tests where
   478  			// we confirm unreachability.
   479  			framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", filterCmd, err, stdout, stderr)
   480  		} else {
   481  			trimmed := strings.TrimSpace(stdout)
   482  			if trimmed != "" {
   483  				eps.Insert(trimmed)
   484  			}
   485  		}
   486  
   487  		// Check against i+1 so we exit if minTries == maxTries.
   488  		if eps.Equal(expectedEps) && i+1 >= minTries {
   489  			framework.Logf("Found all %d expected endpoints: %+v", eps.Len(), eps.List())
   490  			return nil
   491  		}
   492  
   493  		framework.Logf("Waiting for %+v endpoints (expected=%+v, actual=%+v)", expectedEps.Difference(eps).List(), expectedEps.List(), eps.List())
   494  
   495  		// TODO: get rid of this delay #36281
   496  		time.Sleep(hitEndpointRetryDelay)
   497  	}
   498  
   499  	config.diagnoseMissingEndpoints(eps)
   500  	return fmt.Errorf("failed to find expected endpoints, \ntries %d\nCommand %v\nretrieved %v\nexpected %v", maxTries, cmd, eps, expectedEps)
   501  }
   502  
   503  // GetSelfURL executes a curl against the given path via kubectl exec into a
   504  // test container running with host networking, and fails if the output
   505  // doesn't match the expected string.
   506  func (config *NetworkingTestConfig) GetSelfURL(ctx context.Context, port int32, path string, expected string) {
   507  	cmd := fmt.Sprintf("curl -i -q -s --connect-timeout 1 http://localhost:%d%s", port, path)
   508  	ginkgo.By(fmt.Sprintf("Getting kube-proxy self URL %s", path))
   509  	config.executeCurlCmd(ctx, cmd, expected)
   510  }
   511  
   512  // GetSelfURLStatusCode executes a curl against the given path via kubectl exec into a
   513  // test container running with host networking, and fails if the returned status
   514  // code doesn't match the expected string.
   515  func (config *NetworkingTestConfig) GetSelfURLStatusCode(ctx context.Context, port int32, path string, expected string) {
   516  	// check status code
   517  	cmd := fmt.Sprintf("curl -o /dev/null -i -q -s -w %%{http_code} --connect-timeout 1 http://localhost:%d%s", port, path)
   518  	ginkgo.By(fmt.Sprintf("Checking status code against http://localhost:%d%s", port, path))
   519  	config.executeCurlCmd(ctx, cmd, expected)
   520  }
   521  
   522  func (config *NetworkingTestConfig) executeCurlCmd(ctx context.Context, cmd string, expected string) {
   523  	// These are arbitrary timeouts. The curl command should pass on first try,
   524  	// unless remote server is starved/bootstrapping/restarting etc.
   525  	const retryInterval = 1 * time.Second
   526  	const retryTimeout = 30 * time.Second
   527  	podName := config.HostTestContainerPod.Name
   528  	var msg string
   529  	if pollErr := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(ctx context.Context) (bool, error) {
   530  		stdout, err := e2epodoutput.RunHostCmd(config.Namespace, podName, cmd)
   531  		if err != nil {
   532  			msg = fmt.Sprintf("failed executing cmd %v in %v/%v: %v", cmd, config.Namespace, podName, err)
   533  			framework.Logf(msg)
   534  			return false, nil
   535  		}
   536  		if !strings.Contains(stdout, expected) {
   537  			msg = fmt.Sprintf("successfully executed %v in %v/%v, but output '%v' doesn't contain expected string '%v'", cmd, config.Namespace, podName, stdout, expected)
   538  			framework.Logf(msg)
   539  			return false, nil
   540  		}
   541  		return true, nil
   542  	}); pollErr != nil {
   543  		framework.Logf("\nOutput of kubectl describe pod %v/%v:\n", config.Namespace, podName)
   544  		desc, _ := e2ekubectl.RunKubectl(
   545  			config.Namespace, "describe", "pod", podName, fmt.Sprintf("--namespace=%v", config.Namespace))
   546  		framework.Logf("%s", desc)
   547  		framework.Failf("Timed out in %v: %v", retryTimeout, msg)
   548  	}
   549  }
   550  
   551  func (config *NetworkingTestConfig) createNetShellPodSpec(podName, hostname string) *v1.Pod {
   552  	netexecArgs := []string{
   553  		"netexec",
   554  		fmt.Sprintf("--http-port=%d", EndpointHTTPPort),
   555  		fmt.Sprintf("--udp-port=%d", EndpointUDPPort),
   556  	}
   557  	// In case of hostnetwork endpoints, we want to bind the udp listener to specific ip addresses.
   558  	// In order to cover legacy AND dualstack, we pass both the host ip and the two pod ips. Agnhost
   559  	// removes duplicates and so this will listen on both addresses (or on the single existing one).
   560  	if config.EndpointsHostNetwork {
   561  		netexecArgs = append(netexecArgs, "--udp-listen-addresses=$(HOST_IP),$(POD_IPS)")
   562  	}
   563  
   564  	probe := &v1.Probe{
   565  		InitialDelaySeconds: 10,
   566  		TimeoutSeconds:      30,
   567  		PeriodSeconds:       10,
   568  		SuccessThreshold:    1,
   569  		FailureThreshold:    3,
   570  		ProbeHandler: v1.ProbeHandler{
   571  			HTTPGet: &v1.HTTPGetAction{
   572  				Path: "/healthz",
   573  				Port: intstr.IntOrString{IntVal: EndpointHTTPPort},
   574  			},
   575  		},
   576  	}
   577  	pod := &v1.Pod{
   578  		TypeMeta: metav1.TypeMeta{
   579  			Kind:       "Pod",
   580  			APIVersion: "v1",
   581  		},
   582  		ObjectMeta: metav1.ObjectMeta{
   583  			Name:      podName,
   584  			Namespace: config.Namespace,
   585  		},
   586  		Spec: v1.PodSpec{
   587  			Containers: []v1.Container{
   588  				{
   589  					Name:            "webserver",
   590  					Image:           NetexecImageName,
   591  					ImagePullPolicy: v1.PullIfNotPresent,
   592  					Args:            netexecArgs,
   593  					Ports: []v1.ContainerPort{
   594  						{
   595  							Name:          "http",
   596  							ContainerPort: EndpointHTTPPort,
   597  						},
   598  						{
   599  							Name:          "udp",
   600  							ContainerPort: EndpointUDPPort,
   601  							Protocol:      v1.ProtocolUDP,
   602  						},
   603  					},
   604  					LivenessProbe:  probe,
   605  					ReadinessProbe: probe,
   606  				},
   607  			},
   608  			NodeSelector: map[string]string{
   609  				"kubernetes.io/hostname": hostname,
   610  			},
   611  		},
   612  	}
   613  	// we want sctp to be optional as it will load the sctp kernel module
   614  	if config.SCTPEnabled {
   615  		pod.Spec.Containers[0].Args = append(pod.Spec.Containers[0].Args, fmt.Sprintf("--sctp-port=%d", EndpointSCTPPort))
   616  		pod.Spec.Containers[0].Ports = append(pod.Spec.Containers[0].Ports, v1.ContainerPort{
   617  			Name:          "sctp",
   618  			ContainerPort: EndpointSCTPPort,
   619  			Protocol:      v1.ProtocolSCTP,
   620  		})
   621  	}
   622  
   623  	if config.EndpointsHostNetwork {
   624  		pod.Spec.Containers[0].Env = []v1.EnvVar{
   625  			{
   626  				Name: "HOST_IP",
   627  				ValueFrom: &v1.EnvVarSource{
   628  					FieldRef: &v1.ObjectFieldSelector{
   629  						FieldPath: "status.hostIP",
   630  					},
   631  				},
   632  			},
   633  			{
   634  				Name: "POD_IPS",
   635  				ValueFrom: &v1.EnvVarSource{
   636  					FieldRef: &v1.ObjectFieldSelector{
   637  						FieldPath: "status.podIPs",
   638  					},
   639  				},
   640  			},
   641  		}
   642  	}
   643  	return pod
   644  }
   645  
   646  func (config *NetworkingTestConfig) createTestPodSpec() *v1.Pod {
   647  	pod := &v1.Pod{
   648  		TypeMeta: metav1.TypeMeta{
   649  			Kind:       "Pod",
   650  			APIVersion: "v1",
   651  		},
   652  		ObjectMeta: metav1.ObjectMeta{
   653  			Name:      testPodName,
   654  			Namespace: config.Namespace,
   655  		},
   656  		Spec: v1.PodSpec{
   657  			Containers: []v1.Container{
   658  				{
   659  					Name:            "webserver",
   660  					Image:           NetexecImageName,
   661  					ImagePullPolicy: v1.PullIfNotPresent,
   662  					Args: []string{
   663  						"netexec",
   664  						fmt.Sprintf("--http-port=%d", testContainerHTTPPort),
   665  					},
   666  					Ports: []v1.ContainerPort{
   667  						{
   668  							Name:          "http",
   669  							ContainerPort: testContainerHTTPPort,
   670  						},
   671  					},
   672  				},
   673  			},
   674  		},
   675  	}
   676  	return pod
   677  }
   678  
   679  func (config *NetworkingTestConfig) createNodePortServiceSpec(svcName string, selector map[string]string, enableSessionAffinity bool) *v1.Service {
   680  	sessionAffinity := v1.ServiceAffinityNone
   681  	if enableSessionAffinity {
   682  		sessionAffinity = v1.ServiceAffinityClientIP
   683  	}
   684  	res := &v1.Service{
   685  		ObjectMeta: metav1.ObjectMeta{
   686  			Name: svcName,
   687  		},
   688  		Spec: v1.ServiceSpec{
   689  			Type: v1.ServiceTypeNodePort,
   690  			Ports: []v1.ServicePort{
   691  				{Port: ClusterHTTPPort, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(EndpointHTTPPort)},
   692  				{Port: ClusterUDPPort, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(EndpointUDPPort)},
   693  			},
   694  			Selector:        selector,
   695  			SessionAffinity: sessionAffinity,
   696  		},
   697  	}
   698  
   699  	if config.SCTPEnabled {
   700  		res.Spec.Ports = append(res.Spec.Ports, v1.ServicePort{Port: ClusterSCTPPort, Name: "sctp", Protocol: v1.ProtocolSCTP, TargetPort: intstr.FromInt32(EndpointSCTPPort)})
   701  	}
   702  	if config.DualStackEnabled {
   703  		requireDual := v1.IPFamilyPolicyRequireDualStack
   704  		res.Spec.IPFamilyPolicy = &requireDual
   705  	}
   706  	return res
   707  }
   708  
   709  func (config *NetworkingTestConfig) createNodePortService(ctx context.Context, selector map[string]string) {
   710  	config.NodePortService = config.CreateService(ctx, config.createNodePortServiceSpec(nodePortServiceName, selector, false))
   711  }
   712  
   713  func (config *NetworkingTestConfig) createSessionAffinityService(ctx context.Context, selector map[string]string) {
   714  	config.SessionAffinityService = config.CreateService(ctx, config.createNodePortServiceSpec(sessionAffinityServiceName, selector, true))
   715  }
   716  
   717  // DeleteNodePortService deletes NodePort service.
   718  func (config *NetworkingTestConfig) DeleteNodePortService(ctx context.Context) {
   719  	err := config.getServiceClient().Delete(ctx, config.NodePortService.Name, metav1.DeleteOptions{})
   720  	framework.ExpectNoError(err, "error while deleting NodePortService. err:%v)", err)
   721  	time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted.
   722  }
   723  
   724  func (config *NetworkingTestConfig) createTestPods(ctx context.Context) {
   725  	testContainerPod := config.createTestPodSpec()
   726  	hostTestContainerPod := e2epod.NewExecPodSpec(config.Namespace, hostTestPodName, config.HostNetwork)
   727  
   728  	config.createPod(ctx, testContainerPod)
   729  	if config.HostNetwork {
   730  		config.createPod(ctx, hostTestContainerPod)
   731  	}
   732  
   733  	framework.ExpectNoError(e2epod.WaitForPodNameRunningInNamespace(ctx, config.f.ClientSet, testContainerPod.Name, config.f.Namespace.Name))
   734  
   735  	var err error
   736  	config.TestContainerPod, err = config.getPodClient().Get(ctx, testContainerPod.Name, metav1.GetOptions{})
   737  	if err != nil {
   738  		framework.Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err)
   739  	}
   740  
   741  	if config.HostNetwork {
   742  		framework.ExpectNoError(e2epod.WaitForPodNameRunningInNamespace(ctx, config.f.ClientSet, hostTestContainerPod.Name, config.f.Namespace.Name))
   743  		config.HostTestContainerPod, err = config.getPodClient().Get(ctx, hostTestContainerPod.Name, metav1.GetOptions{})
   744  		if err != nil {
   745  			framework.Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err)
   746  		}
   747  	}
   748  }
   749  
   750  // CreateService creates the provided service in config.Namespace and returns created service
   751  func (config *NetworkingTestConfig) CreateService(ctx context.Context, serviceSpec *v1.Service) *v1.Service {
   752  	_, err := config.getServiceClient().Create(ctx, serviceSpec, metav1.CreateOptions{})
   753  	framework.ExpectNoError(err, fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
   754  
   755  	err = WaitForService(ctx, config.f.ClientSet, config.Namespace, serviceSpec.Name, true, 5*time.Second, 45*time.Second)
   756  	framework.ExpectNoError(err, fmt.Sprintf("error while waiting for service:%s err: %v", serviceSpec.Name, err))
   757  
   758  	createdService, err := config.getServiceClient().Get(ctx, serviceSpec.Name, metav1.GetOptions{})
   759  	framework.ExpectNoError(err, fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
   760  
   761  	return createdService
   762  }
   763  
   764  // setupCore sets up the pods and core test config
   765  // mainly for simplified node e2e setup
   766  func (config *NetworkingTestConfig) setupCore(ctx context.Context, selector map[string]string) {
   767  	ginkgo.By("Creating the service pods in kubernetes")
   768  	podName := "netserver"
   769  	config.EndpointPods = config.createNetProxyPods(ctx, podName, selector)
   770  
   771  	ginkgo.By("Creating test pods")
   772  	config.createTestPods(ctx)
   773  
   774  	epCount := len(config.EndpointPods)
   775  
   776  	// Note that this is not O(n^2) in practice, because epCount SHOULD be < 10.  In cases that epCount is > 10, this would be prohibitively large.
   777  	// Check maxNetProxyPodsCount for details.
   778  	config.MaxTries = epCount*epCount + testTries
   779  	framework.Logf("Setting MaxTries for pod polling to %v for networking test based on endpoint count %v", config.MaxTries, epCount)
   780  }
   781  
   782  // setup includes setupCore and also sets up services
   783  func (config *NetworkingTestConfig) setup(ctx context.Context, selector map[string]string) {
   784  	config.setupCore(ctx, selector)
   785  
   786  	ginkgo.By("Getting node addresses")
   787  	framework.ExpectNoError(e2enode.WaitForAllNodesSchedulable(ctx, config.f.ClientSet, 10*time.Minute))
   788  	nodeList, err := e2enode.GetReadySchedulableNodes(ctx, config.f.ClientSet)
   789  	framework.ExpectNoError(err)
   790  
   791  	e2eskipper.SkipUnlessNodeCountIsAtLeast(2)
   792  	config.Nodes = nodeList.Items
   793  
   794  	ginkgo.By("Creating the service on top of the pods in kubernetes")
   795  	config.createNodePortService(ctx, selector)
   796  	config.createSessionAffinityService(ctx, selector)
   797  
   798  	for _, p := range config.NodePortService.Spec.Ports {
   799  		switch p.Protocol {
   800  		case v1.ProtocolUDP:
   801  			config.NodeUDPPort = int(p.NodePort)
   802  		case v1.ProtocolTCP:
   803  			config.NodeHTTPPort = int(p.NodePort)
   804  		case v1.ProtocolSCTP:
   805  			config.NodeSCTPPort = int(p.NodePort)
   806  		default:
   807  			continue
   808  		}
   809  	}
   810  
   811  	// obtain the ClusterIP
   812  	config.ClusterIP = config.NodePortService.Spec.ClusterIP
   813  	if config.DualStackEnabled {
   814  		config.SecondaryClusterIP = config.NodePortService.Spec.ClusterIPs[1]
   815  	}
   816  
   817  	// Obtain the primary IP family of the Cluster based on the first ClusterIP
   818  	// TODO: Eventually we should just be getting these from Spec.IPFamilies
   819  	// but for now that would only if the feature gate is enabled.
   820  	family := v1.IPv4Protocol
   821  	secondaryFamily := v1.IPv6Protocol
   822  	if netutils.IsIPv6String(config.ClusterIP) {
   823  		family = v1.IPv6Protocol
   824  		secondaryFamily = v1.IPv4Protocol
   825  	}
   826  	if config.PreferExternalAddresses {
   827  		// Get Node IPs from the cluster, ExternalIPs take precedence
   828  		config.NodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeExternalIP, family)
   829  	}
   830  	if config.NodeIP == "" {
   831  		config.NodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeInternalIP, family)
   832  	}
   833  	if config.DualStackEnabled {
   834  		if config.PreferExternalAddresses {
   835  			config.SecondaryNodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeExternalIP, secondaryFamily)
   836  		}
   837  		if config.SecondaryNodeIP == "" {
   838  			config.SecondaryNodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeInternalIP, secondaryFamily)
   839  		}
   840  	}
   841  
   842  	ginkgo.By("Waiting for NodePort service to expose endpoint")
   843  	err = framework.WaitForServiceEndpointsNum(ctx, config.f.ClientSet, config.Namespace, nodePortServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
   844  	framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", nodePortServiceName, config.Namespace)
   845  	ginkgo.By("Waiting for Session Affinity service to expose endpoint")
   846  	err = framework.WaitForServiceEndpointsNum(ctx, config.f.ClientSet, config.Namespace, sessionAffinityServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
   847  	framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", sessionAffinityServiceName, config.Namespace)
   848  }
   849  
   850  func (config *NetworkingTestConfig) createNetProxyPods(ctx context.Context, podName string, selector map[string]string) []*v1.Pod {
   851  	framework.ExpectNoError(e2enode.WaitForAllNodesSchedulable(ctx, config.f.ClientSet, 10*time.Minute))
   852  	nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, config.f.ClientSet, maxNetProxyPodsCount)
   853  	framework.ExpectNoError(err)
   854  	nodes := nodeList.Items
   855  
   856  	// create pods, one for each node
   857  	createdPods := make([]*v1.Pod, 0, len(nodes))
   858  	for i, n := range nodes {
   859  		podName := fmt.Sprintf("%s-%d", podName, i)
   860  		hostname, _ := n.Labels["kubernetes.io/hostname"]
   861  		pod := config.createNetShellPodSpec(podName, hostname)
   862  		pod.ObjectMeta.Labels = selector
   863  		pod.Spec.HostNetwork = config.EndpointsHostNetwork
   864  
   865  		// NOTE(claudiub): In order to use HostNetwork on Windows, we need to use Privileged Containers.
   866  		if pod.Spec.HostNetwork && framework.NodeOSDistroIs("windows") {
   867  			e2epod.WithWindowsHostProcess(pod, "")
   868  		}
   869  		createdPod := config.createPod(ctx, pod)
   870  		createdPods = append(createdPods, createdPod)
   871  	}
   872  
   873  	// wait that all of them are up
   874  	runningPods := make([]*v1.Pod, 0, len(nodes))
   875  	for _, p := range createdPods {
   876  		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, config.f.ClientSet, p.Name, config.f.Namespace.Name, framework.PodStartTimeout))
   877  		rp, err := config.getPodClient().Get(ctx, p.Name, metav1.GetOptions{})
   878  		framework.ExpectNoError(err)
   879  		runningPods = append(runningPods, rp)
   880  	}
   881  
   882  	return runningPods
   883  }
   884  
   885  // DeleteNetProxyPod deletes the first endpoint pod and waits for it being removed.
   886  func (config *NetworkingTestConfig) DeleteNetProxyPod(ctx context.Context) {
   887  	pod := config.EndpointPods[0]
   888  	framework.ExpectNoError(config.getPodClient().Delete(ctx, pod.Name, *metav1.NewDeleteOptions(0)))
   889  	config.EndpointPods = config.EndpointPods[1:]
   890  	// wait for pod being deleted.
   891  	err := e2epod.WaitForPodNotFoundInNamespace(ctx, config.f.ClientSet, pod.Name, config.Namespace, wait.ForeverTestTimeout)
   892  	if err != nil {
   893  		framework.Failf("Failed to delete %s pod: %v", pod.Name, err)
   894  	}
   895  	// wait for endpoint being removed.
   896  	err = framework.WaitForServiceEndpointsNum(ctx, config.f.ClientSet, config.Namespace, nodePortServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
   897  	if err != nil {
   898  		framework.Failf("Failed to remove endpoint from service: %s", nodePortServiceName)
   899  	}
   900  	// wait for kube-proxy to catch up with the pod being deleted.
   901  	time.Sleep(5 * time.Second)
   902  }
   903  
   904  func (config *NetworkingTestConfig) createPod(ctx context.Context, pod *v1.Pod) *v1.Pod {
   905  	return config.getPodClient().Create(ctx, pod)
   906  }
   907  
   908  func (config *NetworkingTestConfig) getPodClient() *e2epod.PodClient {
   909  	if config.podClient == nil {
   910  		config.podClient = e2epod.NewPodClient(config.f)
   911  	}
   912  	return config.podClient
   913  }
   914  
   915  func (config *NetworkingTestConfig) getServiceClient() coreclientset.ServiceInterface {
   916  	return config.f.ClientSet.CoreV1().Services(config.Namespace)
   917  }
   918  
   919  // HTTPPokeParams is a struct for HTTP poke parameters.
   920  type HTTPPokeParams struct {
   921  	Timeout        time.Duration // default = 10 secs
   922  	ExpectCode     int           // default = 200
   923  	BodyContains   string
   924  	RetriableCodes []int
   925  	EnableHTTPS    bool
   926  }
   927  
   928  // HTTPPokeResult is a struct for HTTP poke result.
   929  type HTTPPokeResult struct {
   930  	Status HTTPPokeStatus
   931  	Code   int    // HTTP code: 0 if the connection was not made
   932  	Error  error  // if there was any error
   933  	Body   []byte // if code != 0
   934  }
   935  
   936  // HTTPPokeStatus is string for representing HTTP poke status.
   937  type HTTPPokeStatus string
   938  
   939  const (
   940  	// HTTPSuccess is HTTP poke status which is success.
   941  	HTTPSuccess HTTPPokeStatus = "Success"
   942  	// HTTPError is HTTP poke status which is error.
   943  	HTTPError HTTPPokeStatus = "UnknownError"
   944  	// HTTPTimeout is HTTP poke status which is timeout.
   945  	HTTPTimeout HTTPPokeStatus = "TimedOut"
   946  	// HTTPRefused is HTTP poke status which is connection refused.
   947  	HTTPRefused HTTPPokeStatus = "ConnectionRefused"
   948  	// HTTPRetryCode is HTTP poke status which is retry code.
   949  	HTTPRetryCode HTTPPokeStatus = "RetryCode"
   950  	// HTTPWrongCode is HTTP poke status which is wrong code.
   951  	HTTPWrongCode HTTPPokeStatus = "WrongCode"
   952  	// HTTPBadResponse is HTTP poke status which is bad response.
   953  	HTTPBadResponse HTTPPokeStatus = "BadResponse"
   954  	// Any time we add new errors, we should audit all callers of this.
   955  )
   956  
   957  // PokeHTTP tries to connect to a host on a port for a given URL path.  Callers
   958  // can specify additional success parameters, if desired.
   959  //
   960  // The result status will be characterized as precisely as possible, given the
   961  // known users of this.
   962  //
   963  // The result code will be zero in case of any failure to connect, or non-zero
   964  // if the HTTP transaction completed (even if the other test params make this a
   965  // failure).
   966  //
   967  // The result error will be populated for any status other than Success.
   968  //
   969  // The result body will be populated if the HTTP transaction was completed, even
   970  // if the other test params make this a failure).
   971  func PokeHTTP(host string, port int, path string, params *HTTPPokeParams) HTTPPokeResult {
   972  	// Set default params.
   973  	if params == nil {
   974  		params = &HTTPPokeParams{}
   975  	}
   976  
   977  	hostPort := net.JoinHostPort(host, strconv.Itoa(port))
   978  	var url string
   979  	if params.EnableHTTPS {
   980  		url = fmt.Sprintf("https://%s%s", hostPort, path)
   981  	} else {
   982  		url = fmt.Sprintf("http://%s%s", hostPort, path)
   983  	}
   984  
   985  	ret := HTTPPokeResult{}
   986  
   987  	// Sanity check inputs, because it has happened.  These are the only things
   988  	// that should hard fail the test - they are basically ASSERT()s.
   989  	if host == "" {
   990  		framework.Failf("Got empty host for HTTP poke (%s)", url)
   991  		return ret
   992  	}
   993  	if port == 0 {
   994  		framework.Failf("Got port==0 for HTTP poke (%s)", url)
   995  		return ret
   996  	}
   997  
   998  	if params.ExpectCode == 0 {
   999  		params.ExpectCode = http.StatusOK
  1000  	}
  1001  
  1002  	if params.Timeout == 0 {
  1003  		params.Timeout = 10 * time.Second
  1004  	}
  1005  
  1006  	framework.Logf("Poking %q", url)
  1007  
  1008  	resp, err := httpGetNoConnectionPoolTimeout(url, params.Timeout)
  1009  	if err != nil {
  1010  		ret.Error = err
  1011  		neterr, ok := err.(net.Error)
  1012  		if ok && neterr.Timeout() {
  1013  			ret.Status = HTTPTimeout
  1014  		} else if strings.Contains(err.Error(), "connection refused") {
  1015  			ret.Status = HTTPRefused
  1016  		} else {
  1017  			ret.Status = HTTPError
  1018  		}
  1019  		framework.Logf("Poke(%q): %v", url, err)
  1020  		return ret
  1021  	}
  1022  
  1023  	ret.Code = resp.StatusCode
  1024  
  1025  	defer resp.Body.Close()
  1026  	body, err := io.ReadAll(resp.Body)
  1027  	if err != nil {
  1028  		ret.Status = HTTPError
  1029  		ret.Error = fmt.Errorf("error reading HTTP body: %w", err)
  1030  		framework.Logf("Poke(%q): %v", url, ret.Error)
  1031  		return ret
  1032  	}
  1033  	ret.Body = make([]byte, len(body))
  1034  	copy(ret.Body, body)
  1035  
  1036  	if resp.StatusCode != params.ExpectCode {
  1037  		for _, code := range params.RetriableCodes {
  1038  			if resp.StatusCode == code {
  1039  				ret.Error = fmt.Errorf("retriable status code: %d", resp.StatusCode)
  1040  				ret.Status = HTTPRetryCode
  1041  				framework.Logf("Poke(%q): %v", url, ret.Error)
  1042  				return ret
  1043  			}
  1044  		}
  1045  		ret.Status = HTTPWrongCode
  1046  		ret.Error = fmt.Errorf("bad status code: %d", resp.StatusCode)
  1047  		framework.Logf("Poke(%q): %v", url, ret.Error)
  1048  		return ret
  1049  	}
  1050  
  1051  	if params.BodyContains != "" && !strings.Contains(string(body), params.BodyContains) {
  1052  		ret.Status = HTTPBadResponse
  1053  		ret.Error = fmt.Errorf("response does not contain expected substring: %q", string(body))
  1054  		framework.Logf("Poke(%q): %v", url, ret.Error)
  1055  		return ret
  1056  	}
  1057  
  1058  	ret.Status = HTTPSuccess
  1059  	framework.Logf("Poke(%q): success", url)
  1060  	return ret
  1061  }
  1062  
  1063  // Does an HTTP GET, but does not reuse TCP connections
  1064  // This masks problems where the iptables rule has changed, but we don't see it
  1065  func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
  1066  	tr := utilnet.SetTransportDefaults(&http.Transport{
  1067  		DisableKeepAlives: true,
  1068  		TLSClientConfig:   &tls.Config{InsecureSkipVerify: true},
  1069  	})
  1070  	client := &http.Client{
  1071  		Transport: tr,
  1072  		Timeout:   timeout,
  1073  	}
  1074  
  1075  	return client.Get(url)
  1076  }
  1077  
  1078  // TestUnderTemporaryNetworkFailure blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
  1079  // At the end (even in case of errors), the network traffic is brought back to normal.
  1080  // This function executes commands on a node so it will work only for some
  1081  // environments.
  1082  func TestUnderTemporaryNetworkFailure(ctx context.Context, c clientset.Interface, ns string, node *v1.Node, testFunc func(ctx context.Context)) {
  1083  	host, err := e2enode.GetSSHExternalIP(node)
  1084  	if err != nil {
  1085  		framework.Failf("Error getting node external ip : %v", err)
  1086  	}
  1087  	controlPlaneAddresses := framework.GetControlPlaneAddresses(ctx, c)
  1088  	ginkgo.By(fmt.Sprintf("block network traffic from node %s to the control plane", node.Name))
  1089  	defer func() {
  1090  		// This code will execute even if setting the iptables rule failed.
  1091  		// It is on purpose because we may have an error even if the new rule
  1092  		// had been inserted. (yes, we could look at the error code and ssh error
  1093  		// separately, but I prefer to stay on the safe side).
  1094  		ginkgo.By(fmt.Sprintf("Unblock network traffic from node %s to the control plane", node.Name))
  1095  		for _, instanceAddress := range controlPlaneAddresses {
  1096  			UnblockNetwork(ctx, host, instanceAddress)
  1097  		}
  1098  	}()
  1099  
  1100  	framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
  1101  	if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
  1102  		framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
  1103  	}
  1104  	for _, instanceAddress := range controlPlaneAddresses {
  1105  		BlockNetwork(ctx, host, instanceAddress)
  1106  	}
  1107  
  1108  	framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
  1109  	if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
  1110  		framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
  1111  	}
  1112  
  1113  	testFunc(ctx)
  1114  	// network traffic is unblocked in a deferred function
  1115  }
  1116  
  1117  // BlockNetwork blocks network between the given from value and the given to value.
  1118  // The following helper functions can block/unblock network from source
  1119  // host to destination host by manipulating iptable rules.
  1120  // This function assumes it can ssh to the source host.
  1121  //
  1122  // Caution:
  1123  // Recommend to input IP instead of hostnames. Using hostnames will cause iptables to
  1124  // do a DNS lookup to resolve the name to an IP address, which will
  1125  // slow down the test and cause it to fail if DNS is absent or broken.
  1126  //
  1127  // Suggested usage pattern:
  1128  //
  1129  //	func foo() {
  1130  //		...
  1131  //		defer UnblockNetwork(from, to)
  1132  //		BlockNetwork(from, to)
  1133  //		...
  1134  //	}
  1135  func BlockNetwork(ctx context.Context, from string, to string) {
  1136  	framework.Logf("block network traffic from %s to %s", from, to)
  1137  	iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
  1138  	dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule)
  1139  	if result, err := e2essh.SSH(ctx, dropCmd, from, framework.TestContext.Provider); result.Code != 0 || err != nil {
  1140  		e2essh.LogResult(result)
  1141  		framework.Failf("Unexpected error: %v", err)
  1142  	}
  1143  }
  1144  
  1145  // UnblockNetwork unblocks network between the given from value and the given to value.
  1146  func UnblockNetwork(ctx context.Context, from string, to string) {
  1147  	framework.Logf("Unblock network traffic from %s to %s", from, to)
  1148  	iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
  1149  	undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule)
  1150  	// Undrop command may fail if the rule has never been created.
  1151  	// In such case we just lose 30 seconds, but the cluster is healthy.
  1152  	// But if the rule had been created and removing it failed, the node is broken and
  1153  	// not coming back. Subsequent tests will run or fewer nodes (some of the tests
  1154  	// may fail). Manual intervention is required in such case (recreating the
  1155  	// cluster solves the problem too).
  1156  	err := wait.PollWithContext(ctx, time.Millisecond*100, time.Second*30, func(ctx context.Context) (bool, error) {
  1157  		result, err := e2essh.SSH(ctx, undropCmd, from, framework.TestContext.Provider)
  1158  		if result.Code == 0 && err == nil {
  1159  			return true, nil
  1160  		}
  1161  		e2essh.LogResult(result)
  1162  		if err != nil {
  1163  			framework.Logf("Unexpected error: %v", err)
  1164  		}
  1165  		return false, nil
  1166  	})
  1167  	if err != nil {
  1168  		framework.Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+
  1169  			"required on host %s: remove rule %s, if exists", from, iptablesRule)
  1170  	}
  1171  }
  1172  
  1173  // WaitForService waits until the service appears (exist == true), or disappears (exist == false)
  1174  func WaitForService(ctx context.Context, c clientset.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error {
  1175  	err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
  1176  		_, err := c.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
  1177  		switch {
  1178  		case err == nil:
  1179  			framework.Logf("Service %s in namespace %s found.", name, namespace)
  1180  			return exist, nil
  1181  		case apierrors.IsNotFound(err):
  1182  			framework.Logf("Service %s in namespace %s disappeared.", name, namespace)
  1183  			return !exist, nil
  1184  		case err != nil:
  1185  			framework.Logf("Non-retryable failure while getting service.")
  1186  			return false, err
  1187  		default:
  1188  			framework.Logf("Get service %s in namespace %s failed: %v", name, namespace, err)
  1189  			return false, nil
  1190  		}
  1191  	})
  1192  	if err != nil {
  1193  		stateMsg := map[bool]string{true: "to appear", false: "to disappear"}
  1194  		return fmt.Errorf("error waiting for service %s/%s %s: %w", namespace, name, stateMsg[exist], err)
  1195  	}
  1196  	return nil
  1197  }
  1198  

View as plain text