1
16
17 package network
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "net"
24 "strconv"
25 "strings"
26 "time"
27
28 v1 "k8s.io/api/core/v1"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/util/wait"
31
32 "k8s.io/kubernetes/test/e2e/framework"
33 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
34 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
35 e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
36 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
37 "k8s.io/kubernetes/test/e2e/network/common"
38 imageutils "k8s.io/kubernetes/test/utils/image"
39 admissionapi "k8s.io/pod-security-admission/api"
40 netutils "k8s.io/utils/net"
41
42 "github.com/onsi/ginkgo/v2"
43 "github.com/onsi/gomega"
44 )
45
46 var kubeProxyE2eImage = imageutils.GetE2EImage(imageutils.Agnhost)
47
48 var _ = common.SIGDescribe("KubeProxy", func() {
49 const (
50 testDaemonTCPPort = 11302
51 postFinTimeoutSeconds = 30
52 )
53
54 fr := framework.NewDefaultFramework("kube-proxy")
55 fr.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
56
57 ginkgo.It("should set TCP CLOSE_WAIT timeout [Privileged]", func(ctx context.Context) {
58 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, fr.ClientSet, 2)
59 framework.ExpectNoError(err)
60 if len(nodes.Items) < 2 {
61 e2eskipper.Skipf(
62 "Test requires >= 2 Ready nodes, but there are only %v nodes",
63 len(nodes.Items))
64 }
65
66 type NodeInfo struct {
67 node *v1.Node
68 name string
69 nodeIP string
70 }
71
72 var family v1.IPFamily
73 if framework.TestContext.ClusterIsIPv6() {
74 family = v1.IPv6Protocol
75 } else {
76 family = v1.IPv4Protocol
77 }
78
79 ips := e2enode.GetAddressesByTypeAndFamily(&nodes.Items[0], v1.NodeInternalIP, family)
80 gomega.Expect(ips).ToNot(gomega.BeEmpty())
81
82 clientNodeInfo := NodeInfo{
83 node: &nodes.Items[0],
84 name: nodes.Items[0].Name,
85 nodeIP: ips[0],
86 }
87
88 ips = e2enode.GetAddressesByTypeAndFamily(&nodes.Items[1], v1.NodeInternalIP, family)
89 gomega.Expect(ips).ToNot(gomega.BeEmpty())
90
91 serverNodeInfo := NodeInfo{
92 node: &nodes.Items[1],
93 name: nodes.Items[1].Name,
94 nodeIP: ips[0],
95 }
96
97
98 privileged := true
99
100 hostExecPod := &v1.Pod{
101 ObjectMeta: metav1.ObjectMeta{
102 Name: "e2e-net-exec",
103 Namespace: fr.Namespace.Name,
104 Labels: map[string]string{"app": "e2e-net-exec"},
105 },
106 Spec: v1.PodSpec{
107 HostNetwork: true,
108 NodeName: clientNodeInfo.name,
109 Containers: []v1.Container{
110 {
111 Name: "e2e-net-exec",
112 Image: imageutils.GetE2EImage(imageutils.DistrolessIptables),
113 ImagePullPolicy: v1.PullIfNotPresent,
114 Command: []string{"sleep", "600"},
115 SecurityContext: &v1.SecurityContext{
116 Privileged: &privileged,
117 },
118 },
119 },
120 },
121 }
122 e2epod.NewPodClient(fr).CreateSync(ctx, hostExecPod)
123
124
125 clientPodSpec := &v1.Pod{
126 ObjectMeta: metav1.ObjectMeta{
127 Name: "e2e-net-client",
128 Namespace: fr.Namespace.Name,
129 Labels: map[string]string{"app": "e2e-net-client"},
130 },
131 Spec: v1.PodSpec{
132 NodeName: clientNodeInfo.name,
133 Containers: []v1.Container{
134 {
135 Name: "e2e-net-client",
136 Image: kubeProxyE2eImage,
137 ImagePullPolicy: v1.PullIfNotPresent,
138 Args: []string{
139 "net",
140 "--runner", "nat-closewait-client",
141 "--options",
142 fmt.Sprintf(`{"RemoteAddr":"%v", "PostFinTimeoutSeconds":%v, "TimeoutSeconds":%v, "LeakConnection":true}`,
143 net.JoinHostPort(serverNodeInfo.nodeIP, strconv.Itoa(testDaemonTCPPort)),
144 postFinTimeoutSeconds,
145 0),
146 },
147 },
148 },
149 },
150 }
151
152 serverPodSpec := &v1.Pod{
153 ObjectMeta: metav1.ObjectMeta{
154 Name: "e2e-net-server",
155 Namespace: fr.Namespace.Name,
156 Labels: map[string]string{"app": "e2e-net-server"},
157 },
158 Spec: v1.PodSpec{
159 NodeName: serverNodeInfo.name,
160 Containers: []v1.Container{
161 {
162 Name: "e2e-net-server",
163 Image: kubeProxyE2eImage,
164 ImagePullPolicy: v1.PullIfNotPresent,
165 Args: []string{
166 "net",
167 "--runner", "nat-closewait-server",
168 "--options",
169 fmt.Sprintf(`{"LocalAddr":":%v", "PostFinTimeoutSeconds":%v}`,
170 testDaemonTCPPort,
171 postFinTimeoutSeconds),
172 },
173 Ports: []v1.ContainerPort{
174 {
175 Name: "tcp",
176 ContainerPort: testDaemonTCPPort,
177 HostPort: testDaemonTCPPort,
178 },
179 },
180 },
181 },
182 },
183 }
184
185 ginkgo.By(fmt.Sprintf(
186 "Launching a server daemon on node %v (node ip: %v, image: %v)",
187 serverNodeInfo.name,
188 serverNodeInfo.nodeIP,
189 kubeProxyE2eImage))
190 e2epod.NewPodClient(fr).CreateSync(ctx, serverPodSpec)
191
192
193 if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, fr.ClientSet, serverPodSpec.Name, fr.Namespace.Name, framework.PodStartTimeout); readyErr != nil {
194 framework.Failf("error waiting for server pod %s to be ready: %v", serverPodSpec.Name, readyErr)
195 }
196
197 ginkgo.By(fmt.Sprintf(
198 "Launching a client connection on node %v (node ip: %v, image: %v)",
199 clientNodeInfo.name,
200 clientNodeInfo.nodeIP,
201 kubeProxyE2eImage))
202 e2epod.NewPodClient(fr).CreateSync(ctx, clientPodSpec)
203
204 ginkgo.By("Checking conntrack entries for the timeout")
205
206
207
208 const epsilonSeconds = 60
209 const expectedTimeoutSeconds = 60 * 60
210
211 ip := serverNodeInfo.nodeIP
212 ipFamily := "ipv4"
213 if netutils.IsIPv6String(ip) {
214 ipFamily = "ipv6"
215 }
216
217
218
219 cmd := fmt.Sprintf("conntrack -L -f %s -d %v "+
220 "| grep -m 1 'CLOSE_WAIT.*dport=%v' ",
221 ipFamily, ip, testDaemonTCPPort)
222 if err := wait.PollImmediate(2*time.Second, epsilonSeconds*time.Second, func() (bool, error) {
223 result, err := e2eoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", cmd)
224
225 if err != nil {
226 framework.Logf("failed to obtain conntrack entry: %v %v", result, err)
227 return false, nil
228 }
229 framework.Logf("conntrack entry for node %v and port %v: %v", serverNodeInfo.nodeIP, testDaemonTCPPort, result)
230
231 line := strings.Fields(result)
232 if len(line) < 3 {
233 return false, fmt.Errorf("conntrack entry does not have a timeout field: %v", line)
234 }
235 timeoutSeconds, err := strconv.Atoi(line[2])
236 if err != nil {
237 return false, fmt.Errorf("failed to convert matched timeout %s to integer: %w", line[2], err)
238 }
239 if math.Abs(float64(timeoutSeconds-expectedTimeoutSeconds)) < epsilonSeconds {
240 return true, nil
241 }
242 return false, fmt.Errorf("wrong TCP CLOSE_WAIT timeout: %v expected: %v", timeoutSeconds, expectedTimeoutSeconds)
243 }); err != nil {
244
245 result, err2 := e2eoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", "conntrack -L")
246 if err2 != nil {
247 framework.Logf("failed to obtain conntrack entry: %v %v", result, err2)
248 }
249 framework.Logf("conntrack entries for node %v: %v", serverNodeInfo.nodeIP, result)
250 framework.Failf("no valid conntrack entry for port %d on node %s: %v", testDaemonTCPPort, serverNodeInfo.nodeIP, err)
251 }
252 })
253
254 })
255
View as plain text