1
16
17 package network
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "net"
24 "regexp"
25 "strings"
26 "time"
27
28 "github.com/onsi/ginkgo/v2"
29 "github.com/onsi/gomega"
30
31 v1 "k8s.io/api/core/v1"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/util/intstr"
34 "k8s.io/apimachinery/pkg/util/wait"
35 "k8s.io/kubernetes/test/e2e/framework"
36 e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
37 e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
38 e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
39 "k8s.io/kubernetes/test/e2e/storage/utils"
40 )
41
42
43 const secondNodePortSvcName = "second-node-port-service"
44
45
46 func GetHTTPContent(host string, port int, timeout time.Duration, url string) (string, error) {
47 var body bytes.Buffer
48 pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
49 result := e2enetwork.PokeHTTP(host, port, url, nil)
50 if result.Status == e2enetwork.HTTPSuccess {
51 body.Write(result.Body)
52 return true, nil
53 }
54 return false, nil
55 })
56 if pollErr != nil {
57 framework.Logf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr)
58 }
59 return body.String(), pollErr
60 }
61
62
63 func GetHTTPContentFromTestContainer(ctx context.Context, config *e2enetwork.NetworkingTestConfig, host string, port int, timeout time.Duration, dialCmd string) (string, error) {
64 var body string
65 pollFn := func() (bool, error) {
66 resp, err := config.GetResponseFromTestContainer(ctx, "http", dialCmd, host, port)
67 if err != nil || len(resp.Errors) > 0 || len(resp.Responses) == 0 {
68 return false, nil
69 }
70 body = resp.Responses[0]
71 return true, nil
72 }
73 if pollErr := wait.PollImmediate(framework.Poll, timeout, pollFn); pollErr != nil {
74 return "", pollErr
75 }
76 return body, nil
77 }
78
79
80 func DescribeSvc(ns string) {
81 framework.Logf("\nOutput of kubectl describe svc:\n")
82 desc, _ := e2ekubectl.RunKubectl(
83 ns, "describe", "svc", fmt.Sprintf("--namespace=%v", ns))
84 framework.Logf(desc)
85 }
86
87
88
89
90
91
92 func CheckSCTPModuleLoadedOnNodes(ctx context.Context, f *framework.Framework, nodes *v1.NodeList) bool {
93 hostExec := utils.NewHostExec(f)
94 ginkgo.DeferCleanup(hostExec.Cleanup)
95 re := regexp.MustCompile(`^\s*sctp\s+`)
96 cmd := "lsmod | grep sctp"
97 for _, node := range nodes.Items {
98 framework.Logf("Executing cmd %q on node %v", cmd, node.Name)
99 result, err := hostExec.IssueCommandWithResult(ctx, cmd, &node)
100 if err != nil {
101 framework.Logf("sctp module is not loaded or error occurred while executing command %s on node: %v", cmd, err)
102 }
103 for _, line := range strings.Split(result, "\n") {
104 if found := re.Find([]byte(line)); found != nil {
105 framework.Logf("the sctp module is loaded on node: %v", node.Name)
106 return true
107 }
108 }
109 framework.Logf("the sctp module is not loaded on node: %v", node.Name)
110 }
111 return false
112 }
113
114
115
116 func execSourceIPTest(sourcePod v1.Pod, targetAddr string) (string, string) {
117 var (
118 err error
119 stdout string
120 timeout = 2 * time.Minute
121 )
122
123 framework.Logf("Waiting up to %v to get response from %s", timeout, targetAddr)
124 cmd := fmt.Sprintf(`curl -q -s --connect-timeout 30 %s/clientip`, targetAddr)
125 for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
126 stdout, err = e2eoutput.RunHostCmd(sourcePod.Namespace, sourcePod.Name, cmd)
127 if err != nil {
128 framework.Logf("got err: %v, retry until timeout", err)
129 continue
130 }
131
132 if strings.TrimSpace(stdout) == "" {
133 framework.Logf("got empty stdout, retry until timeout")
134 continue
135 }
136 break
137 }
138
139 framework.ExpectNoError(err)
140
141
142 host, _, err := net.SplitHostPort(stdout)
143 if err != nil {
144
145 framework.Failf("exec pod returned unexpected stdout: [%v]\n", stdout)
146 }
147 return sourcePod.Status.PodIP, host
148 }
149
150
151
152
153
154 func execHostnameTest(sourcePod v1.Pod, targetAddr, targetHostname string) {
155 var (
156 err error
157 stdout string
158 timeout = 2 * time.Minute
159 )
160
161 framework.Logf("Waiting up to %v to get response from %s", timeout, targetAddr)
162 cmd := fmt.Sprintf(`curl -q -s --max-time 30 %s/hostname`, targetAddr)
163 for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
164 stdout, err = e2eoutput.RunHostCmd(sourcePod.Namespace, sourcePod.Name, cmd)
165 if err != nil {
166 framework.Logf("got err: %v, retry until timeout", err)
167 continue
168 }
169
170 if strings.TrimSpace(stdout) == "" {
171 framework.Logf("got empty stdout, retry until timeout")
172 continue
173 }
174 break
175 }
176
177
178 targetHostname = strings.Split(targetHostname, ".")[0]
179 hostname := strings.TrimSpace(strings.Split(stdout, ".")[0])
180
181 framework.ExpectNoError(err)
182 gomega.Expect(hostname).To(gomega.Equal(targetHostname))
183 }
184
185
186 func createSecondNodePortService(ctx context.Context, f *framework.Framework, config *e2enetwork.NetworkingTestConfig) (*v1.Service, int) {
187 svc := &v1.Service{
188 ObjectMeta: metav1.ObjectMeta{
189 Name: secondNodePortSvcName,
190 },
191 Spec: v1.ServiceSpec{
192 Type: v1.ServiceTypeNodePort,
193 Ports: []v1.ServicePort{
194 {
195 Port: e2enetwork.ClusterHTTPPort,
196 Name: "http",
197 Protocol: v1.ProtocolTCP,
198 TargetPort: intstr.FromInt32(e2enetwork.EndpointHTTPPort),
199 },
200 },
201 Selector: config.NodePortService.Spec.Selector,
202 },
203 }
204
205 createdService := config.CreateService(ctx, svc)
206
207 err := framework.WaitForServiceEndpointsNum(ctx, f.ClientSet, config.Namespace, secondNodePortSvcName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
208 framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", secondNodePortSvcName, config.Namespace)
209
210 var httpPort int
211 for _, p := range createdService.Spec.Ports {
212 switch p.Protocol {
213 case v1.ProtocolTCP:
214 httpPort = int(p.NodePort)
215 default:
216 continue
217 }
218 }
219
220 return createdService, httpPort
221 }
222
223
224
225 func testEndpointReachability(ctx context.Context, endpoint string, port int32, protocol v1.Protocol, execPod *v1.Pod, timeout time.Duration) error {
226 cmd := ""
227 switch protocol {
228 case v1.ProtocolTCP:
229 cmd = fmt.Sprintf("echo hostName | nc -v -t -w 2 %s %v", endpoint, port)
230 case v1.ProtocolUDP:
231 cmd = fmt.Sprintf("echo hostName | nc -v -u -w 2 %s %v", endpoint, port)
232 default:
233 return fmt.Errorf("service reachability check is not supported for %v", protocol)
234 }
235
236 err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) {
237 stdout, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
238 if err != nil {
239 framework.Logf("Service reachability failing with error: %v\nRetrying...", err)
240 return false, nil
241 }
242 trimmed := strings.TrimSpace(stdout)
243 if trimmed != "" {
244 return true, nil
245 }
246 return false, nil
247 })
248 if err != nil {
249 return fmt.Errorf("service is not reachable within %v timeout on endpoint %s %d over %s protocol", timeout, endpoint, port, protocol)
250 }
251 return nil
252 }
253
View as plain text