    17  package network
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"net"
    24  	"regexp"
    25  	"strings"
    26  	"time"
    28  	"github.com/onsi/ginkgo/v2"
    29  	"github.com/onsi/gomega"
    31  	v1 "k8s.io/api/core/v1"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/util/intstr"
    34  	"k8s.io/apimachinery/pkg/util/wait"
    35  	"k8s.io/kubernetes/test/e2e/framework"
    36  	e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
    37  	e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
    38  	e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
    39  	"k8s.io/kubernetes/test/e2e/storage/utils"
    40  )
    42  // secondNodePortSvcName is the name of the secondary node port service
    43  const secondNodePortSvcName = "second-node-port-service"
    45  // GetHTTPContent returns the content of the given url by HTTP.
    46  func GetHTTPContent(host string, port int, timeout time.Duration, url string) (string, error) {
    47  	var body bytes.Buffer
    48  	pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
    49  		result := e2enetwork.PokeHTTP(host, port, url, nil)
    50  		if result.Status == e2enetwork.HTTPSuccess {
    51  			body.Write(result.Body)
    52  			return true, nil
    53  		}
    54  		return false, nil
    55  	})
    56  	if pollErr != nil {
    57  		framework.Logf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr)
    58  	}
    59  	return body.String(), pollErr
    60  }
    62  // GetHTTPContentFromTestContainer returns the content of the given url by HTTP via a test container.
    63  func GetHTTPContentFromTestContainer(ctx context.Context, config *e2enetwork.NetworkingTestConfig, host string, port int, timeout time.Duration, dialCmd string) (string, error) {
    64  	var body string
    65  	pollFn := func() (bool, error) {
    66  		resp, err := config.GetResponseFromTestContainer(ctx, "http", dialCmd, host, port)
    67  		if err != nil || len(resp.Errors) > 0 || len(resp.Responses) == 0 {
    68  			return false, nil
    69  		}
    70  		body = resp.Responses[0]
    71  		return true, nil
    72  	}
    73  	if pollErr := wait.PollImmediate(framework.Poll, timeout, pollFn); pollErr != nil {
    74  		return "", pollErr
    75  	}
    76  	return body, nil
    77  }
    79  // DescribeSvc logs the output of kubectl describe svc for the given namespace
    80  func DescribeSvc(ns string) {
    81  	framework.Logf("\nOutput of kubectl describe svc:\n")
    82  	desc, _ := e2ekubectl.RunKubectl(
    83  		ns, "describe", "svc", fmt.Sprintf("--namespace=%v", ns))
    84  	framework.Logf(desc)
    85  }
    87  // CheckSCTPModuleLoadedOnNodes checks whether any node on the list has the
    88  // sctp.ko module loaded
    89  // For security reasons, and also to allow clusters to use userspace SCTP implementations,
    90  // we require that just creating an SCTP Pod/Service/NetworkPolicy must not do anything
    91  // that would cause the sctp kernel module to be loaded.
    92  func CheckSCTPModuleLoadedOnNodes(ctx context.Context, f *framework.Framework, nodes *v1.NodeList) bool {
    93  	hostExec := utils.NewHostExec(f)
    94  	ginkgo.DeferCleanup(hostExec.Cleanup)
    95  	re := regexp.MustCompile(`^\s*sctp\s+`)
    96  	cmd := "lsmod | grep sctp"
    97  	for _, node := range nodes.Items {
    98  		framework.Logf("Executing cmd %q on node %v", cmd, node.Name)
    99  		result, err := hostExec.IssueCommandWithResult(ctx, cmd, &node)
   100  		if err != nil {
   101  			framework.Logf("sctp module is not loaded or error occurred while executing command %s on node: %v", cmd, err)
   102  		}
   103  		for _, line := range strings.Split(result, "\n") {
   104  			if found := re.Find([]byte(line)); found != nil {
   105  				framework.Logf("the sctp module is loaded on node: %v", node.Name)
   106  				return true
   107  			}
   108  		}
   109  		framework.Logf("the sctp module is not loaded on node: %v", node.Name)
   110  	}
   111  	return false
   112  }
   114  // execSourceIPTest executes curl to access "/clientip" endpoint on target address
   115  // from given Pod to check if source ip is preserved.
   116  func execSourceIPTest(sourcePod v1.Pod, targetAddr string) (string, string) {
   117  	var (
   118  		err     error
   119  		stdout  string
   120  		timeout = 2 * time.Minute
   121  	)
   123  	framework.Logf("Waiting up to %v to get response from %s", timeout, targetAddr)
   124  	cmd := fmt.Sprintf(`curl -q -s --connect-timeout 30 %s/clientip`, targetAddr)
   125  	for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
   126  		stdout, err = e2eoutput.RunHostCmd(sourcePod.Namespace, sourcePod.Name, cmd)
   127  		if err != nil {
   128  			framework.Logf("got err: %v, retry until timeout", err)
   129  			continue
   130  		}
   131  		// Need to check output because it might omit in case of error.
   132  		if strings.TrimSpace(stdout) == "" {
   133  			framework.Logf("got empty stdout, retry until timeout")
   134  			continue
   135  		}
   136  		break
   137  	}
   139  	framework.ExpectNoError(err)
   141  	// The stdout return from RunHostCmd is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
   142  	host, _, err := net.SplitHostPort(stdout)
   143  	if err != nil {
   144  		// ginkgo.Fail the test if output format is unexpected.
   145  		framework.Failf("exec pod returned unexpected stdout: [%v]\n", stdout)
   146  	}
   147  	return sourcePod.Status.PodIP, host
   148  }
   150  // execHostnameTest executes curl to access "/hostname" endpoint on target address
   151  // from given Pod to check the hostname of the target destination.
   152  // It also converts FQDNs to hostnames, so if an FQDN is passed as
   153  // targetHostname only the hostname part will be considered for comparison.
   154  func execHostnameTest(sourcePod v1.Pod, targetAddr, targetHostname string) {
   155  	var (
   156  		err     error
   157  		stdout  string
   158  		timeout = 2 * time.Minute
   159  	)
   161  	framework.Logf("Waiting up to %v to get response from %s", timeout, targetAddr)
   162  	cmd := fmt.Sprintf(`curl -q -s --max-time 30 %s/hostname`, targetAddr)
   163  	for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
   164  		stdout, err = e2eoutput.RunHostCmd(sourcePod.Namespace, sourcePod.Name, cmd)
   165  		if err != nil {
   166  			framework.Logf("got err: %v, retry until timeout", err)
   167  			continue
   168  		}
   169  		// Need to check output because it might omit in case of error.
   170  		if strings.TrimSpace(stdout) == "" {
   171  			framework.Logf("got empty stdout, retry until timeout")
   172  			continue
   173  		}
   174  		break
   175  	}
   177  	// Ensure we're comparing hostnames and not FQDNs
   178  	targetHostname = strings.Split(targetHostname, ".")[0]
   179  	hostname := strings.TrimSpace(strings.Split(stdout, ".")[0])
   181  	framework.ExpectNoError(err)
   182  	gomega.Expect(hostname).To(gomega.Equal(targetHostname))
   183  }
   185  // createSecondNodePortService creates a service with the same selector as config.NodePortService and same HTTP Port
   186  func createSecondNodePortService(ctx context.Context, f *framework.Framework, config *e2enetwork.NetworkingTestConfig) (*v1.Service, int) {
   187  	svc := &v1.Service{
   188  		ObjectMeta: metav1.ObjectMeta{
   189  			Name: secondNodePortSvcName,
   190  		},
   191  		Spec: v1.ServiceSpec{
   192  			Type: v1.ServiceTypeNodePort,
   193  			Ports: []v1.ServicePort{
   194  				{
   195  					Port:       e2enetwork.ClusterHTTPPort,
   196  					Name:       "http",
   197  					Protocol:   v1.ProtocolTCP,
   198  					TargetPort: intstr.FromInt32(e2enetwork.EndpointHTTPPort),
   199  				},
   200  			},
   201  			Selector: config.NodePortService.Spec.Selector,
   202  		},
   203  	}
   205  	createdService := config.CreateService(ctx, svc)
   207  	err := framework.WaitForServiceEndpointsNum(ctx, f.ClientSet, config.Namespace, secondNodePortSvcName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
   208  	framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", secondNodePortSvcName, config.Namespace)
   210  	var httpPort int
   211  	for _, p := range createdService.Spec.Ports {
   212  		switch p.Protocol {
   213  		case v1.ProtocolTCP:
   214  			httpPort = int(p.NodePort)
   215  		default:
   216  			continue
   217  		}
   218  	}
   220  	return createdService, httpPort
   221  }
   223  // testEndpointReachability tests reachability to endpoints (i.e. IP, ServiceName) and ports. Test request is initiated from specified execPod.
   224  // TCP and UDP protocol based service are supported at this moment
   225  func testEndpointReachability(ctx context.Context, endpoint string, port int32, protocol v1.Protocol, execPod *v1.Pod, timeout time.Duration) error {
   226  	cmd := ""
   227  	switch protocol {
   228  	case v1.ProtocolTCP:
   229  		cmd = fmt.Sprintf("echo hostName | nc -v -t -w 2 %s %v", endpoint, port)
   230  	case v1.ProtocolUDP:
   231  		cmd = fmt.Sprintf("echo hostName | nc -v -u -w 2 %s %v", endpoint, port)
   232  	default:
   233  		return fmt.Errorf("service reachability check is not supported for %v", protocol)
   234  	}
   236  	err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) {
   237  		stdout, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
   238  		if err != nil {
   239  			framework.Logf("Service reachability failing with error: %v\nRetrying...", err)
   240  			return false, nil
   241  		}
   242  		trimmed := strings.TrimSpace(stdout)
   243  		if trimmed != "" {
   244  			return true, nil
   245  		}
   246  		return false, nil
   247  	})
   248  	if err != nil {
   249  		return fmt.Errorf("service is not reachable within %v timeout on endpoint %s %d over %s protocol", timeout, endpoint, port, protocol)
   250  	}
   251  	return nil
   252  }

