1
16
17 package network
18
19 import (
20 "context"
21 "crypto/tls"
22 "encoding/json"
23 "fmt"
24 "io"
25 "net"
26 "net/http"
27 "strconv"
28 "strings"
29 "time"
30
31 "github.com/onsi/ginkgo/v2"
32 v1 "k8s.io/api/core/v1"
33 apierrors "k8s.io/apimachinery/pkg/api/errors"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/util/intstr"
36 utilnet "k8s.io/apimachinery/pkg/util/net"
37 "k8s.io/apimachinery/pkg/util/sets"
38 "k8s.io/apimachinery/pkg/util/uuid"
39 "k8s.io/apimachinery/pkg/util/wait"
40 clientset "k8s.io/client-go/kubernetes"
41 coreclientset "k8s.io/client-go/kubernetes/typed/core/v1"
42 "k8s.io/kubernetes/test/e2e/framework"
43 e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
44 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
45 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
46 e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
47 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
48 e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
49 imageutils "k8s.io/kubernetes/test/utils/image"
50 netutils "k8s.io/utils/net"
51 )
52
53 const (
54
55 EndpointHTTPPort = 8083
56
57 EndpointUDPPort = 8081
58
59 EndpointSCTPPort = 8082
60
61 testContainerHTTPPort = 9080
62
63 ClusterHTTPPort = 80
64
65 ClusterUDPPort = 90
66
67 ClusterSCTPPort = 95
68 testPodName = "test-container-pod"
69 hostTestPodName = "host-test-container-pod"
70 nodePortServiceName = "node-port-service"
71 sessionAffinityServiceName = "session-affinity-service"
72
73
74 hitEndpointRetryDelay = 2 * time.Second
75
76
77 testTries = 30
78
79 maxNetProxyPodsCount = 10
80
81 SessionAffinityChecks = 10
82
83 RegexIPv4 = "(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)"
84
85 RegexIPv6 = "(?:(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){6})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:::(?:(?:(?:[0-9a-fA-F]{1,4})):){5})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){4})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,1}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){3})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,2}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){2})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,3}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:[0-9a-fA-F]{1,4})):)(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,4}(?:(?:[0-9a-fA-F]{1,4})))?::)(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,5}(?:(?:[0-9a-fA-F]{1,4})))?::)(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,6}(?:(?:[0-9a-fA-F]{1,4})))?::))))"
86 resizeNodeReadyTimeout = 2 * time.Minute
87 resizeNodeNotReadyTimeout = 2 * time.Minute
88
89
90 echoHostname = "hostname"
91 )
92
93
94 var NetexecImageName = imageutils.GetE2EImage(imageutils.Agnhost)
95
96
97 type Option func(*NetworkingTestConfig)
98
99
100 func EnableSCTP(config *NetworkingTestConfig) {
101 config.SCTPEnabled = true
102 }
103
104
105 func EnableDualStack(config *NetworkingTestConfig) {
106 config.DualStackEnabled = true
107 }
108
109
110 func UseHostNetwork(config *NetworkingTestConfig) {
111 config.HostNetwork = true
112 }
113
114
115 func EndpointsUseHostNetwork(config *NetworkingTestConfig) {
116 config.EndpointsHostNetwork = true
117 }
118
119
120 func PreferExternalAddresses(config *NetworkingTestConfig) {
121 config.PreferExternalAddresses = true
122 }
123
124
125 func NewNetworkingTestConfig(ctx context.Context, f *framework.Framework, setters ...Option) *NetworkingTestConfig {
126
127 config := &NetworkingTestConfig{
128 f: f,
129 Namespace: f.Namespace.Name,
130 }
131 for _, setter := range setters {
132 setter(config)
133 }
134 ginkgo.By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
135 config.setup(ctx, getServiceSelector())
136 return config
137 }
138
139
140 func NewCoreNetworkingTestConfig(ctx context.Context, f *framework.Framework, hostNetwork bool) *NetworkingTestConfig {
141
142 config := &NetworkingTestConfig{
143 f: f,
144 Namespace: f.Namespace.Name,
145 HostNetwork: hostNetwork,
146 }
147 ginkgo.By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
148 config.setupCore(ctx, getServiceSelector())
149 return config
150 }
151
152 func getServiceSelector() map[string]string {
153 ginkgo.By("creating a selector")
154 selectorName := "selector-" + string(uuid.NewUUID())
155 serviceSelector := map[string]string{
156 selectorName: "true",
157 }
158 return serviceSelector
159 }
160
161
162
163 type NetworkingTestConfig struct {
164
165
166 TestContainerPod *v1.Pod
167
168 HostTestContainerPod *v1.Pod
169
170 HostNetwork bool
171
172 EndpointsHostNetwork bool
173
174
175 SCTPEnabled bool
176
177 DualStackEnabled bool
178
179
180
181 EndpointPods []*v1.Pod
182 f *framework.Framework
183 podClient *e2epod.PodClient
184
185
186 NodePortService *v1.Service
187
188
189 SessionAffinityService *v1.Service
190
191 Nodes []v1.Node
192
193
194 MaxTries int
195
196 ClusterIP string
197
198 SecondaryClusterIP string
199
200
201 NodeIP string
202
203
204 SecondaryNodeIP string
205
206 NodeHTTPPort int
207 NodeUDPPort int
208 NodeSCTPPort int
209
210
211 Namespace string
212
213 PreferExternalAddresses bool
214 }
215
216
217 type NetexecDialResponse struct {
218 Responses []string `json:"responses"`
219 Errors []string `json:"errors"`
220 }
221
222
223 func (config *NetworkingTestConfig) DialFromEndpointContainer(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) error {
224 return config.DialFromContainer(ctx, protocol, echoHostname, config.EndpointPods[0].Status.PodIP, targetIP, EndpointHTTPPort, targetPort, maxTries, minTries, expectedEps)
225 }
226
227
228 func (config *NetworkingTestConfig) DialFromTestContainer(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) error {
229 return config.DialFromContainer(ctx, protocol, echoHostname, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, maxTries, minTries, expectedEps)
230 }
231
232
233 func (config *NetworkingTestConfig) DialEchoFromTestContainer(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, echoMessage string) error {
234 expectedResponse := sets.NewString()
235 expectedResponse.Insert(echoMessage)
236 var dialCommand string
237
238
239
240
241
242 if protocol == "http" {
243 dialCommand = fmt.Sprintf("echo?msg=%s", echoMessage)
244 } else {
245 dialCommand = fmt.Sprintf("echo%%20%s", echoMessage)
246 }
247 return config.DialFromContainer(ctx, protocol, dialCommand, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, maxTries, minTries, expectedResponse)
248 }
249
250
251
252
253 func (config *NetworkingTestConfig) diagnoseMissingEndpoints(foundEndpoints sets.String) {
254 for _, e := range config.EndpointPods {
255 if foundEndpoints.Has(e.Name) {
256 continue
257 }
258 framework.Logf("\nOutput of kubectl describe pod %v/%v:\n", e.Namespace, e.Name)
259 desc, _ := e2ekubectl.RunKubectl(
260 e.Namespace, "describe", "pod", e.Name, fmt.Sprintf("--namespace=%v", e.Namespace))
261 framework.Logf(desc)
262 }
263 }
264
265
266 func (config *NetworkingTestConfig) EndpointHostnames() sets.String {
267 expectedEps := sets.NewString()
268 for _, p := range config.EndpointPods {
269 if config.EndpointsHostNetwork {
270 expectedEps.Insert(p.Spec.NodeSelector["kubernetes.io/hostname"])
271 } else {
272 expectedEps.Insert(p.Name)
273 }
274 }
275 return expectedEps
276 }
277
278 func makeCURLDialCommand(ipPort, dialCmd, protocol, targetIP string, targetPort int) string {
279
280
281
282 return fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=%s&protocol=%s&host=%s&port=%d&tries=1'",
283 ipPort,
284 dialCmd,
285 protocol,
286 targetIP,
287 targetPort)
288 }
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310 func (config *NetworkingTestConfig) DialFromContainer(ctx context.Context, protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort, maxTries, minTries int, expectedResponses sets.String) error {
311 ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
312 cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort)
313
314 responses := sets.NewString()
315
316 for i := 0; i < maxTries; i++ {
317 resp, err := config.GetResponseFromContainer(ctx, protocol, dialCommand, containerIP, targetIP, containerHTTPPort, targetPort)
318 if err != nil {
319
320
321
322 framework.Logf("GetResponseFromContainer: %s", err)
323 continue
324 }
325 for _, response := range resp.Responses {
326 trimmed := strings.TrimSpace(response)
327 if trimmed != "" {
328 responses.Insert(trimmed)
329 }
330 }
331 framework.Logf("Waiting for responses: %v", expectedResponses.Difference(responses))
332
333
334 if (responses.Equal(expectedResponses) || responses.Len() == 0 && expectedResponses.Len() == 0) && i+1 >= minTries {
335 framework.Logf("reached %v after %v/%v tries", targetIP, i, maxTries)
336 return nil
337 }
338
339 time.Sleep(hitEndpointRetryDelay)
340 }
341 if dialCommand == echoHostname {
342 config.diagnoseMissingEndpoints(responses)
343 }
344 returnMsg := fmt.Errorf("did not find expected responses... \nTries %d\nCommand %v\nretrieved %v\nexpected %v", maxTries, cmd, responses, expectedResponses)
345 framework.Logf("encountered error during dial (%v)", returnMsg)
346 return returnMsg
347
348 }
349
350
351 func (config *NetworkingTestConfig) GetEndpointsFromTestContainer(ctx context.Context, protocol, targetIP string, targetPort, tries int) (sets.String, error) {
352 return config.GetEndpointsFromContainer(ctx, protocol, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, tries)
353 }
354
355
356
357
358
359
360 func (config *NetworkingTestConfig) GetEndpointsFromContainer(ctx context.Context, protocol, containerIP, targetIP string, containerHTTPPort, targetPort, tries int) (sets.String, error) {
361 ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
362 cmd := makeCURLDialCommand(ipPort, "hostName", protocol, targetIP, targetPort)
363
364 eps := sets.NewString()
365
366 for i := 0; i < tries; i++ {
367 stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.TestContainerPod.Name, cmd)
368 if err != nil {
369
370
371
372 framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr)
373 } else {
374 podInfo := fmt.Sprintf("name: %v, namespace: %v, hostIp: %v, podIp: %v, conditions: %v", config.TestContainerPod.Name, config.TestContainerPod.Namespace, config.TestContainerPod.Status.HostIP, config.TestContainerPod.Status.PodIP, config.TestContainerPod.Status.Conditions)
375 framework.Logf("Tries: %d, in try: %d, stdout: %v, stderr: %v, command run in Pod { %#v }", tries, i, stdout, stderr, podInfo)
376
377 var output NetexecDialResponse
378 if err := json.Unmarshal([]byte(stdout), &output); err != nil {
379 framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
380 cmd, config.TestContainerPod.Name, stdout, err)
381 continue
382 }
383
384 for _, hostName := range output.Responses {
385 trimmed := strings.TrimSpace(hostName)
386 if trimmed != "" {
387 eps.Insert(trimmed)
388 }
389 }
390
391 time.Sleep(hitEndpointRetryDelay)
392 }
393 }
394 return eps, nil
395 }
396
397
398 func (config *NetworkingTestConfig) GetResponseFromContainer(ctx context.Context, protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort int) (NetexecDialResponse, error) {
399 ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
400 cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort)
401
402 stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.TestContainerPod.Name, cmd)
403 if err != nil {
404 return NetexecDialResponse{}, fmt.Errorf("failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr)
405 }
406
407 var output NetexecDialResponse
408 if err := json.Unmarshal([]byte(stdout), &output); err != nil {
409 return NetexecDialResponse{}, fmt.Errorf("failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
410 cmd, config.TestContainerPod.Name, stdout, err)
411 }
412 return output, nil
413 }
414
415
416 func (config *NetworkingTestConfig) GetResponseFromTestContainer(ctx context.Context, protocol, dialCommand, targetIP string, targetPort int) (NetexecDialResponse, error) {
417 return config.GetResponseFromContainer(ctx, protocol, dialCommand, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort)
418 }
419
420
421 func (config *NetworkingTestConfig) GetHTTPCodeFromTestContainer(ctx context.Context, path, targetIP string, targetPort int) (int, error) {
422 cmd := fmt.Sprintf("curl -g -q -s -o /dev/null -w %%{http_code} http://%s:%d%s",
423 targetIP,
424 targetPort,
425 path)
426 stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.TestContainerPod.Name, cmd)
427
428
429
430
431 if err != nil && len(stdout) == 0 {
432 return 0, fmt.Errorf("failed to execute %q: %v, stderr: %q", cmd, err, stderr)
433 }
434 code, err := strconv.Atoi(stdout)
435 if err != nil {
436 return 0, fmt.Errorf("failed to parse status code returned by healthz endpoint: %w, code: %s", err, stdout)
437 }
438 return code, nil
439 }
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454 func (config *NetworkingTestConfig) DialFromNode(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) error {
455 var cmd string
456 if protocol == "udp" {
457 cmd = fmt.Sprintf("echo hostName | nc -w 1 -u %s %d", targetIP, targetPort)
458 } else {
459 ipPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort))
460
461
462
463 cmd = fmt.Sprintf("curl -g -q -s --max-time 15 --connect-timeout 1 http://%s/hostName", ipPort)
464 }
465
466
467
468
469 eps := sets.NewString()
470
471 filterCmd := fmt.Sprintf("%s | grep -v '^\\s*$'", cmd)
472 framework.Logf("Going to poll %v on port %v at least %v times, with a maximum of %v tries before failing", targetIP, targetPort, minTries, maxTries)
473 for i := 0; i < maxTries; i++ {
474 stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.HostTestContainerPod.Name, filterCmd)
475 if err != nil || len(stderr) > 0 {
476
477
478
479 framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", filterCmd, err, stdout, stderr)
480 } else {
481 trimmed := strings.TrimSpace(stdout)
482 if trimmed != "" {
483 eps.Insert(trimmed)
484 }
485 }
486
487
488 if eps.Equal(expectedEps) && i+1 >= minTries {
489 framework.Logf("Found all %d expected endpoints: %+v", eps.Len(), eps.List())
490 return nil
491 }
492
493 framework.Logf("Waiting for %+v endpoints (expected=%+v, actual=%+v)", expectedEps.Difference(eps).List(), expectedEps.List(), eps.List())
494
495
496 time.Sleep(hitEndpointRetryDelay)
497 }
498
499 config.diagnoseMissingEndpoints(eps)
500 return fmt.Errorf("failed to find expected endpoints, \ntries %d\nCommand %v\nretrieved %v\nexpected %v", maxTries, cmd, eps, expectedEps)
501 }
502
503
504
505
506 func (config *NetworkingTestConfig) GetSelfURL(ctx context.Context, port int32, path string, expected string) {
507 cmd := fmt.Sprintf("curl -i -q -s --connect-timeout 1 http://localhost:%d%s", port, path)
508 ginkgo.By(fmt.Sprintf("Getting kube-proxy self URL %s", path))
509 config.executeCurlCmd(ctx, cmd, expected)
510 }
511
512
513
514
515 func (config *NetworkingTestConfig) GetSelfURLStatusCode(ctx context.Context, port int32, path string, expected string) {
516
517 cmd := fmt.Sprintf("curl -o /dev/null -i -q -s -w %%{http_code} --connect-timeout 1 http://localhost:%d%s", port, path)
518 ginkgo.By(fmt.Sprintf("Checking status code against http://localhost:%d%s", port, path))
519 config.executeCurlCmd(ctx, cmd, expected)
520 }
521
522 func (config *NetworkingTestConfig) executeCurlCmd(ctx context.Context, cmd string, expected string) {
523
524
525 const retryInterval = 1 * time.Second
526 const retryTimeout = 30 * time.Second
527 podName := config.HostTestContainerPod.Name
528 var msg string
529 if pollErr := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(ctx context.Context) (bool, error) {
530 stdout, err := e2epodoutput.RunHostCmd(config.Namespace, podName, cmd)
531 if err != nil {
532 msg = fmt.Sprintf("failed executing cmd %v in %v/%v: %v", cmd, config.Namespace, podName, err)
533 framework.Logf(msg)
534 return false, nil
535 }
536 if !strings.Contains(stdout, expected) {
537 msg = fmt.Sprintf("successfully executed %v in %v/%v, but output '%v' doesn't contain expected string '%v'", cmd, config.Namespace, podName, stdout, expected)
538 framework.Logf(msg)
539 return false, nil
540 }
541 return true, nil
542 }); pollErr != nil {
543 framework.Logf("\nOutput of kubectl describe pod %v/%v:\n", config.Namespace, podName)
544 desc, _ := e2ekubectl.RunKubectl(
545 config.Namespace, "describe", "pod", podName, fmt.Sprintf("--namespace=%v", config.Namespace))
546 framework.Logf("%s", desc)
547 framework.Failf("Timed out in %v: %v", retryTimeout, msg)
548 }
549 }
550
551 func (config *NetworkingTestConfig) createNetShellPodSpec(podName, hostname string) *v1.Pod {
552 netexecArgs := []string{
553 "netexec",
554 fmt.Sprintf("--http-port=%d", EndpointHTTPPort),
555 fmt.Sprintf("--udp-port=%d", EndpointUDPPort),
556 }
557
558
559
560 if config.EndpointsHostNetwork {
561 netexecArgs = append(netexecArgs, "--udp-listen-addresses=$(HOST_IP),$(POD_IPS)")
562 }
563
564 probe := &v1.Probe{
565 InitialDelaySeconds: 10,
566 TimeoutSeconds: 30,
567 PeriodSeconds: 10,
568 SuccessThreshold: 1,
569 FailureThreshold: 3,
570 ProbeHandler: v1.ProbeHandler{
571 HTTPGet: &v1.HTTPGetAction{
572 Path: "/healthz",
573 Port: intstr.IntOrString{IntVal: EndpointHTTPPort},
574 },
575 },
576 }
577 pod := &v1.Pod{
578 TypeMeta: metav1.TypeMeta{
579 Kind: "Pod",
580 APIVersion: "v1",
581 },
582 ObjectMeta: metav1.ObjectMeta{
583 Name: podName,
584 Namespace: config.Namespace,
585 },
586 Spec: v1.PodSpec{
587 Containers: []v1.Container{
588 {
589 Name: "webserver",
590 Image: NetexecImageName,
591 ImagePullPolicy: v1.PullIfNotPresent,
592 Args: netexecArgs,
593 Ports: []v1.ContainerPort{
594 {
595 Name: "http",
596 ContainerPort: EndpointHTTPPort,
597 },
598 {
599 Name: "udp",
600 ContainerPort: EndpointUDPPort,
601 Protocol: v1.ProtocolUDP,
602 },
603 },
604 LivenessProbe: probe,
605 ReadinessProbe: probe,
606 },
607 },
608 NodeSelector: map[string]string{
609 "kubernetes.io/hostname": hostname,
610 },
611 },
612 }
613
614 if config.SCTPEnabled {
615 pod.Spec.Containers[0].Args = append(pod.Spec.Containers[0].Args, fmt.Sprintf("--sctp-port=%d", EndpointSCTPPort))
616 pod.Spec.Containers[0].Ports = append(pod.Spec.Containers[0].Ports, v1.ContainerPort{
617 Name: "sctp",
618 ContainerPort: EndpointSCTPPort,
619 Protocol: v1.ProtocolSCTP,
620 })
621 }
622
623 if config.EndpointsHostNetwork {
624 pod.Spec.Containers[0].Env = []v1.EnvVar{
625 {
626 Name: "HOST_IP",
627 ValueFrom: &v1.EnvVarSource{
628 FieldRef: &v1.ObjectFieldSelector{
629 FieldPath: "status.hostIP",
630 },
631 },
632 },
633 {
634 Name: "POD_IPS",
635 ValueFrom: &v1.EnvVarSource{
636 FieldRef: &v1.ObjectFieldSelector{
637 FieldPath: "status.podIPs",
638 },
639 },
640 },
641 }
642 }
643 return pod
644 }
645
646 func (config *NetworkingTestConfig) createTestPodSpec() *v1.Pod {
647 pod := &v1.Pod{
648 TypeMeta: metav1.TypeMeta{
649 Kind: "Pod",
650 APIVersion: "v1",
651 },
652 ObjectMeta: metav1.ObjectMeta{
653 Name: testPodName,
654 Namespace: config.Namespace,
655 },
656 Spec: v1.PodSpec{
657 Containers: []v1.Container{
658 {
659 Name: "webserver",
660 Image: NetexecImageName,
661 ImagePullPolicy: v1.PullIfNotPresent,
662 Args: []string{
663 "netexec",
664 fmt.Sprintf("--http-port=%d", testContainerHTTPPort),
665 },
666 Ports: []v1.ContainerPort{
667 {
668 Name: "http",
669 ContainerPort: testContainerHTTPPort,
670 },
671 },
672 },
673 },
674 },
675 }
676 return pod
677 }
678
679 func (config *NetworkingTestConfig) createNodePortServiceSpec(svcName string, selector map[string]string, enableSessionAffinity bool) *v1.Service {
680 sessionAffinity := v1.ServiceAffinityNone
681 if enableSessionAffinity {
682 sessionAffinity = v1.ServiceAffinityClientIP
683 }
684 res := &v1.Service{
685 ObjectMeta: metav1.ObjectMeta{
686 Name: svcName,
687 },
688 Spec: v1.ServiceSpec{
689 Type: v1.ServiceTypeNodePort,
690 Ports: []v1.ServicePort{
691 {Port: ClusterHTTPPort, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(EndpointHTTPPort)},
692 {Port: ClusterUDPPort, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(EndpointUDPPort)},
693 },
694 Selector: selector,
695 SessionAffinity: sessionAffinity,
696 },
697 }
698
699 if config.SCTPEnabled {
700 res.Spec.Ports = append(res.Spec.Ports, v1.ServicePort{Port: ClusterSCTPPort, Name: "sctp", Protocol: v1.ProtocolSCTP, TargetPort: intstr.FromInt32(EndpointSCTPPort)})
701 }
702 if config.DualStackEnabled {
703 requireDual := v1.IPFamilyPolicyRequireDualStack
704 res.Spec.IPFamilyPolicy = &requireDual
705 }
706 return res
707 }
708
709 func (config *NetworkingTestConfig) createNodePortService(ctx context.Context, selector map[string]string) {
710 config.NodePortService = config.CreateService(ctx, config.createNodePortServiceSpec(nodePortServiceName, selector, false))
711 }
712
713 func (config *NetworkingTestConfig) createSessionAffinityService(ctx context.Context, selector map[string]string) {
714 config.SessionAffinityService = config.CreateService(ctx, config.createNodePortServiceSpec(sessionAffinityServiceName, selector, true))
715 }
716
717
718 func (config *NetworkingTestConfig) DeleteNodePortService(ctx context.Context) {
719 err := config.getServiceClient().Delete(ctx, config.NodePortService.Name, metav1.DeleteOptions{})
720 framework.ExpectNoError(err, "error while deleting NodePortService. err:%v)", err)
721 time.Sleep(15 * time.Second)
722 }
723
724 func (config *NetworkingTestConfig) createTestPods(ctx context.Context) {
725 testContainerPod := config.createTestPodSpec()
726 hostTestContainerPod := e2epod.NewExecPodSpec(config.Namespace, hostTestPodName, config.HostNetwork)
727
728 config.createPod(ctx, testContainerPod)
729 if config.HostNetwork {
730 config.createPod(ctx, hostTestContainerPod)
731 }
732
733 framework.ExpectNoError(e2epod.WaitForPodNameRunningInNamespace(ctx, config.f.ClientSet, testContainerPod.Name, config.f.Namespace.Name))
734
735 var err error
736 config.TestContainerPod, err = config.getPodClient().Get(ctx, testContainerPod.Name, metav1.GetOptions{})
737 if err != nil {
738 framework.Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err)
739 }
740
741 if config.HostNetwork {
742 framework.ExpectNoError(e2epod.WaitForPodNameRunningInNamespace(ctx, config.f.ClientSet, hostTestContainerPod.Name, config.f.Namespace.Name))
743 config.HostTestContainerPod, err = config.getPodClient().Get(ctx, hostTestContainerPod.Name, metav1.GetOptions{})
744 if err != nil {
745 framework.Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err)
746 }
747 }
748 }
749
750
751 func (config *NetworkingTestConfig) CreateService(ctx context.Context, serviceSpec *v1.Service) *v1.Service {
752 _, err := config.getServiceClient().Create(ctx, serviceSpec, metav1.CreateOptions{})
753 framework.ExpectNoError(err, fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
754
755 err = WaitForService(ctx, config.f.ClientSet, config.Namespace, serviceSpec.Name, true, 5*time.Second, 45*time.Second)
756 framework.ExpectNoError(err, fmt.Sprintf("error while waiting for service:%s err: %v", serviceSpec.Name, err))
757
758 createdService, err := config.getServiceClient().Get(ctx, serviceSpec.Name, metav1.GetOptions{})
759 framework.ExpectNoError(err, fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
760
761 return createdService
762 }
763
764
765
766 func (config *NetworkingTestConfig) setupCore(ctx context.Context, selector map[string]string) {
767 ginkgo.By("Creating the service pods in kubernetes")
768 podName := "netserver"
769 config.EndpointPods = config.createNetProxyPods(ctx, podName, selector)
770
771 ginkgo.By("Creating test pods")
772 config.createTestPods(ctx)
773
774 epCount := len(config.EndpointPods)
775
776
777
778 config.MaxTries = epCount*epCount + testTries
779 framework.Logf("Setting MaxTries for pod polling to %v for networking test based on endpoint count %v", config.MaxTries, epCount)
780 }
781
782
783 func (config *NetworkingTestConfig) setup(ctx context.Context, selector map[string]string) {
784 config.setupCore(ctx, selector)
785
786 ginkgo.By("Getting node addresses")
787 framework.ExpectNoError(e2enode.WaitForAllNodesSchedulable(ctx, config.f.ClientSet, 10*time.Minute))
788 nodeList, err := e2enode.GetReadySchedulableNodes(ctx, config.f.ClientSet)
789 framework.ExpectNoError(err)
790
791 e2eskipper.SkipUnlessNodeCountIsAtLeast(2)
792 config.Nodes = nodeList.Items
793
794 ginkgo.By("Creating the service on top of the pods in kubernetes")
795 config.createNodePortService(ctx, selector)
796 config.createSessionAffinityService(ctx, selector)
797
798 for _, p := range config.NodePortService.Spec.Ports {
799 switch p.Protocol {
800 case v1.ProtocolUDP:
801 config.NodeUDPPort = int(p.NodePort)
802 case v1.ProtocolTCP:
803 config.NodeHTTPPort = int(p.NodePort)
804 case v1.ProtocolSCTP:
805 config.NodeSCTPPort = int(p.NodePort)
806 default:
807 continue
808 }
809 }
810
811
812 config.ClusterIP = config.NodePortService.Spec.ClusterIP
813 if config.DualStackEnabled {
814 config.SecondaryClusterIP = config.NodePortService.Spec.ClusterIPs[1]
815 }
816
817
818
819
820 family := v1.IPv4Protocol
821 secondaryFamily := v1.IPv6Protocol
822 if netutils.IsIPv6String(config.ClusterIP) {
823 family = v1.IPv6Protocol
824 secondaryFamily = v1.IPv4Protocol
825 }
826 if config.PreferExternalAddresses {
827
828 config.NodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeExternalIP, family)
829 }
830 if config.NodeIP == "" {
831 config.NodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeInternalIP, family)
832 }
833 if config.DualStackEnabled {
834 if config.PreferExternalAddresses {
835 config.SecondaryNodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeExternalIP, secondaryFamily)
836 }
837 if config.SecondaryNodeIP == "" {
838 config.SecondaryNodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeInternalIP, secondaryFamily)
839 }
840 }
841
842 ginkgo.By("Waiting for NodePort service to expose endpoint")
843 err = framework.WaitForServiceEndpointsNum(ctx, config.f.ClientSet, config.Namespace, nodePortServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
844 framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", nodePortServiceName, config.Namespace)
845 ginkgo.By("Waiting for Session Affinity service to expose endpoint")
846 err = framework.WaitForServiceEndpointsNum(ctx, config.f.ClientSet, config.Namespace, sessionAffinityServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
847 framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", sessionAffinityServiceName, config.Namespace)
848 }
849
850 func (config *NetworkingTestConfig) createNetProxyPods(ctx context.Context, podName string, selector map[string]string) []*v1.Pod {
851 framework.ExpectNoError(e2enode.WaitForAllNodesSchedulable(ctx, config.f.ClientSet, 10*time.Minute))
852 nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, config.f.ClientSet, maxNetProxyPodsCount)
853 framework.ExpectNoError(err)
854 nodes := nodeList.Items
855
856
857 createdPods := make([]*v1.Pod, 0, len(nodes))
858 for i, n := range nodes {
859 podName := fmt.Sprintf("%s-%d", podName, i)
860 hostname, _ := n.Labels["kubernetes.io/hostname"]
861 pod := config.createNetShellPodSpec(podName, hostname)
862 pod.ObjectMeta.Labels = selector
863 pod.Spec.HostNetwork = config.EndpointsHostNetwork
864
865
866 if pod.Spec.HostNetwork && framework.NodeOSDistroIs("windows") {
867 e2epod.WithWindowsHostProcess(pod, "")
868 }
869 createdPod := config.createPod(ctx, pod)
870 createdPods = append(createdPods, createdPod)
871 }
872
873
874 runningPods := make([]*v1.Pod, 0, len(nodes))
875 for _, p := range createdPods {
876 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, config.f.ClientSet, p.Name, config.f.Namespace.Name, framework.PodStartTimeout))
877 rp, err := config.getPodClient().Get(ctx, p.Name, metav1.GetOptions{})
878 framework.ExpectNoError(err)
879 runningPods = append(runningPods, rp)
880 }
881
882 return runningPods
883 }
884
885
886 func (config *NetworkingTestConfig) DeleteNetProxyPod(ctx context.Context) {
887 pod := config.EndpointPods[0]
888 framework.ExpectNoError(config.getPodClient().Delete(ctx, pod.Name, *metav1.NewDeleteOptions(0)))
889 config.EndpointPods = config.EndpointPods[1:]
890
891 err := e2epod.WaitForPodNotFoundInNamespace(ctx, config.f.ClientSet, pod.Name, config.Namespace, wait.ForeverTestTimeout)
892 if err != nil {
893 framework.Failf("Failed to delete %s pod: %v", pod.Name, err)
894 }
895
896 err = framework.WaitForServiceEndpointsNum(ctx, config.f.ClientSet, config.Namespace, nodePortServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
897 if err != nil {
898 framework.Failf("Failed to remove endpoint from service: %s", nodePortServiceName)
899 }
900
901 time.Sleep(5 * time.Second)
902 }
903
904 func (config *NetworkingTestConfig) createPod(ctx context.Context, pod *v1.Pod) *v1.Pod {
905 return config.getPodClient().Create(ctx, pod)
906 }
907
908 func (config *NetworkingTestConfig) getPodClient() *e2epod.PodClient {
909 if config.podClient == nil {
910 config.podClient = e2epod.NewPodClient(config.f)
911 }
912 return config.podClient
913 }
914
915 func (config *NetworkingTestConfig) getServiceClient() coreclientset.ServiceInterface {
916 return config.f.ClientSet.CoreV1().Services(config.Namespace)
917 }
918
919
920 type HTTPPokeParams struct {
921 Timeout time.Duration
922 ExpectCode int
923 BodyContains string
924 RetriableCodes []int
925 EnableHTTPS bool
926 }
927
928
929 type HTTPPokeResult struct {
930 Status HTTPPokeStatus
931 Code int
932 Error error
933 Body []byte
934 }
935
936
937 type HTTPPokeStatus string
938
939 const (
940
941 HTTPSuccess HTTPPokeStatus = "Success"
942
943 HTTPError HTTPPokeStatus = "UnknownError"
944
945 HTTPTimeout HTTPPokeStatus = "TimedOut"
946
947 HTTPRefused HTTPPokeStatus = "ConnectionRefused"
948
949 HTTPRetryCode HTTPPokeStatus = "RetryCode"
950
951 HTTPWrongCode HTTPPokeStatus = "WrongCode"
952
953 HTTPBadResponse HTTPPokeStatus = "BadResponse"
954
955 )
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971 func PokeHTTP(host string, port int, path string, params *HTTPPokeParams) HTTPPokeResult {
972
973 if params == nil {
974 params = &HTTPPokeParams{}
975 }
976
977 hostPort := net.JoinHostPort(host, strconv.Itoa(port))
978 var url string
979 if params.EnableHTTPS {
980 url = fmt.Sprintf("https://%s%s", hostPort, path)
981 } else {
982 url = fmt.Sprintf("http://%s%s", hostPort, path)
983 }
984
985 ret := HTTPPokeResult{}
986
987
988
989 if host == "" {
990 framework.Failf("Got empty host for HTTP poke (%s)", url)
991 return ret
992 }
993 if port == 0 {
994 framework.Failf("Got port==0 for HTTP poke (%s)", url)
995 return ret
996 }
997
998 if params.ExpectCode == 0 {
999 params.ExpectCode = http.StatusOK
1000 }
1001
1002 if params.Timeout == 0 {
1003 params.Timeout = 10 * time.Second
1004 }
1005
1006 framework.Logf("Poking %q", url)
1007
1008 resp, err := httpGetNoConnectionPoolTimeout(url, params.Timeout)
1009 if err != nil {
1010 ret.Error = err
1011 neterr, ok := err.(net.Error)
1012 if ok && neterr.Timeout() {
1013 ret.Status = HTTPTimeout
1014 } else if strings.Contains(err.Error(), "connection refused") {
1015 ret.Status = HTTPRefused
1016 } else {
1017 ret.Status = HTTPError
1018 }
1019 framework.Logf("Poke(%q): %v", url, err)
1020 return ret
1021 }
1022
1023 ret.Code = resp.StatusCode
1024
1025 defer resp.Body.Close()
1026 body, err := io.ReadAll(resp.Body)
1027 if err != nil {
1028 ret.Status = HTTPError
1029 ret.Error = fmt.Errorf("error reading HTTP body: %w", err)
1030 framework.Logf("Poke(%q): %v", url, ret.Error)
1031 return ret
1032 }
1033 ret.Body = make([]byte, len(body))
1034 copy(ret.Body, body)
1035
1036 if resp.StatusCode != params.ExpectCode {
1037 for _, code := range params.RetriableCodes {
1038 if resp.StatusCode == code {
1039 ret.Error = fmt.Errorf("retriable status code: %d", resp.StatusCode)
1040 ret.Status = HTTPRetryCode
1041 framework.Logf("Poke(%q): %v", url, ret.Error)
1042 return ret
1043 }
1044 }
1045 ret.Status = HTTPWrongCode
1046 ret.Error = fmt.Errorf("bad status code: %d", resp.StatusCode)
1047 framework.Logf("Poke(%q): %v", url, ret.Error)
1048 return ret
1049 }
1050
1051 if params.BodyContains != "" && !strings.Contains(string(body), params.BodyContains) {
1052 ret.Status = HTTPBadResponse
1053 ret.Error = fmt.Errorf("response does not contain expected substring: %q", string(body))
1054 framework.Logf("Poke(%q): %v", url, ret.Error)
1055 return ret
1056 }
1057
1058 ret.Status = HTTPSuccess
1059 framework.Logf("Poke(%q): success", url)
1060 return ret
1061 }
1062
1063
1064
1065 func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
1066 tr := utilnet.SetTransportDefaults(&http.Transport{
1067 DisableKeepAlives: true,
1068 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
1069 })
1070 client := &http.Client{
1071 Transport: tr,
1072 Timeout: timeout,
1073 }
1074
1075 return client.Get(url)
1076 }
1077
1078
1079
1080
1081
1082 func TestUnderTemporaryNetworkFailure(ctx context.Context, c clientset.Interface, ns string, node *v1.Node, testFunc func(ctx context.Context)) {
1083 host, err := e2enode.GetSSHExternalIP(node)
1084 if err != nil {
1085 framework.Failf("Error getting node external ip : %v", err)
1086 }
1087 controlPlaneAddresses := framework.GetControlPlaneAddresses(ctx, c)
1088 ginkgo.By(fmt.Sprintf("block network traffic from node %s to the control plane", node.Name))
1089 defer func() {
1090
1091
1092
1093
1094 ginkgo.By(fmt.Sprintf("Unblock network traffic from node %s to the control plane", node.Name))
1095 for _, instanceAddress := range controlPlaneAddresses {
1096 UnblockNetwork(ctx, host, instanceAddress)
1097 }
1098 }()
1099
1100 framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
1101 if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
1102 framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
1103 }
1104 for _, instanceAddress := range controlPlaneAddresses {
1105 BlockNetwork(ctx, host, instanceAddress)
1106 }
1107
1108 framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
1109 if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
1110 framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
1111 }
1112
1113 testFunc(ctx)
1114
1115 }
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135 func BlockNetwork(ctx context.Context, from string, to string) {
1136 framework.Logf("block network traffic from %s to %s", from, to)
1137 iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
1138 dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule)
1139 if result, err := e2essh.SSH(ctx, dropCmd, from, framework.TestContext.Provider); result.Code != 0 || err != nil {
1140 e2essh.LogResult(result)
1141 framework.Failf("Unexpected error: %v", err)
1142 }
1143 }
1144
1145
1146 func UnblockNetwork(ctx context.Context, from string, to string) {
1147 framework.Logf("Unblock network traffic from %s to %s", from, to)
1148 iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
1149 undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule)
1150
1151
1152
1153
1154
1155
1156 err := wait.PollWithContext(ctx, time.Millisecond*100, time.Second*30, func(ctx context.Context) (bool, error) {
1157 result, err := e2essh.SSH(ctx, undropCmd, from, framework.TestContext.Provider)
1158 if result.Code == 0 && err == nil {
1159 return true, nil
1160 }
1161 e2essh.LogResult(result)
1162 if err != nil {
1163 framework.Logf("Unexpected error: %v", err)
1164 }
1165 return false, nil
1166 })
1167 if err != nil {
1168 framework.Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+
1169 "required on host %s: remove rule %s, if exists", from, iptablesRule)
1170 }
1171 }
1172
1173
1174 func WaitForService(ctx context.Context, c clientset.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error {
1175 err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
1176 _, err := c.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
1177 switch {
1178 case err == nil:
1179 framework.Logf("Service %s in namespace %s found.", name, namespace)
1180 return exist, nil
1181 case apierrors.IsNotFound(err):
1182 framework.Logf("Service %s in namespace %s disappeared.", name, namespace)
1183 return !exist, nil
1184 case err != nil:
1185 framework.Logf("Non-retryable failure while getting service.")
1186 return false, err
1187 default:
1188 framework.Logf("Get service %s in namespace %s failed: %v", name, namespace, err)
1189 return false, nil
1190 }
1191 })
1192 if err != nil {
1193 stateMsg := map[bool]string{true: "to appear", false: "to disappear"}
1194 return fmt.Errorf("error waiting for service %s/%s %s: %w", namespace, name, stateMsg[exist], err)
1195 }
1196 return nil
1197 }
1198
View as plain text