1
16
17 package network
18
19
20 import (
21 "context"
22 "fmt"
23 "time"
24
25 "github.com/onsi/ginkgo/v2"
26 appsv1 "k8s.io/api/apps/v1"
27 v1 "k8s.io/api/core/v1"
28 discoveryv1 "k8s.io/api/discovery/v1"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/util/wait"
31 clientset "k8s.io/client-go/kubernetes"
32 "k8s.io/kubernetes/test/e2e/feature"
33 "k8s.io/kubernetes/test/e2e/framework"
34 e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
35 e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
36 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
37 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
38 "k8s.io/kubernetes/test/e2e/network/common"
39 imageutils "k8s.io/kubernetes/test/utils/image"
40 admissionapi "k8s.io/pod-security-admission/api"
41 )
42
43 const (
44
45 largeClusterTimeout = 400 * time.Second
46
47
48 iperf2BaselineBandwidthMegabytesPerSecond = 10
49
50 iperf2Port = 6789
51
52 labelKey = "app"
53
54 clientLabelValue = "iperf2-client"
55
56 serverLabelValue = "iperf2-server"
57
58 serverServiceName = "iperf2-server"
59 )
60
61 func iperf2ServerDeployment(ctx context.Context, client clientset.Interface, namespace string, isIPV6 bool) (*appsv1.Deployment, error) {
62 framework.Logf("deploying iperf2 server")
63 one := int64(1)
64 replicas := int32(1)
65 labels := map[string]string{labelKey: serverLabelValue}
66 args := []string{
67 "-s",
68 "-p",
69 fmt.Sprintf("%d", iperf2Port),
70 }
71 if isIPV6 {
72 args = append(args, "-V")
73 }
74 deploymentSpec := e2edeployment.NewDeployment(
75 "iperf2-server-deployment", replicas, labels, "iperf2-server",
76 imageutils.GetE2EImage(imageutils.Agnhost), appsv1.RollingUpdateDeploymentStrategyType)
77 deploymentSpec.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
78 deploymentSpec.Spec.Template.Spec.Containers[0].Command = []string{"iperf"}
79 deploymentSpec.Spec.Template.Spec.Containers[0].Args = args
80 deploymentSpec.Spec.Template.Spec.Containers[0].Ports = []v1.ContainerPort{
81 {
82 ContainerPort: iperf2Port,
83 Protocol: v1.ProtocolTCP,
84 },
85 }
86
87 deployment, err := client.AppsV1().Deployments(namespace).Create(ctx, deploymentSpec, metav1.CreateOptions{})
88 if err != nil {
89 return nil, fmt.Errorf("deployment %q Create API error: %w", deploymentSpec.Name, err)
90 }
91 framework.Logf("Waiting for deployment %q to complete", deploymentSpec.Name)
92 err = e2edeployment.WaitForDeploymentComplete(client, deployment)
93 if err != nil {
94 return nil, fmt.Errorf("deployment %q failed to complete: %w", deploymentSpec.Name, err)
95 }
96
97 return deployment, nil
98 }
99
100 func iperf2ServerService(ctx context.Context, client clientset.Interface, namespace string) (*v1.Service, error) {
101 service := &v1.Service{
102 ObjectMeta: metav1.ObjectMeta{Name: serverServiceName},
103 Spec: v1.ServiceSpec{
104 Selector: map[string]string{
105 labelKey: serverLabelValue,
106 },
107 Ports: []v1.ServicePort{
108 {Protocol: v1.ProtocolTCP, Port: iperf2Port},
109 },
110 },
111 }
112 return client.CoreV1().Services(namespace).Create(ctx, service, metav1.CreateOptions{})
113 }
114
115 func iperf2ClientDaemonSet(ctx context.Context, client clientset.Interface, namespace string) (*appsv1.DaemonSet, error) {
116 one := int64(1)
117 labels := map[string]string{labelKey: clientLabelValue}
118 spec := e2edaemonset.NewDaemonSet("iperf2-clients", imageutils.GetE2EImage(imageutils.Agnhost), labels, nil, nil, nil)
119 spec.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
120
121 ds, err := client.AppsV1().DaemonSets(namespace).Create(ctx, spec, metav1.CreateOptions{})
122 if err != nil {
123 return nil, fmt.Errorf("daemonset %s Create API error: %w", spec.Name, err)
124 }
125 return ds, nil
126 }
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141 var _ = common.SIGDescribe("Networking IPerf2", feature.NetworkingPerformance, func() {
142
143 f := framework.NewDefaultFramework("network-perf")
144 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
145
146 ginkgo.It("should run iperf2", func(ctx context.Context) {
147 readySchedulableNodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
148 framework.ExpectNoError(err)
149
150 familyStr := ""
151 if framework.TestContext.ClusterIsIPv6() {
152 familyStr = "-V "
153 }
154
155 serverPodsListOptions := metav1.ListOptions{
156 LabelSelector: fmt.Sprintf("%s=%s", labelKey, serverLabelValue),
157 }
158
159
160 _, err = iperf2ServerDeployment(ctx, f.ClientSet, f.Namespace.Name, framework.TestContext.ClusterIsIPv6())
161 framework.ExpectNoError(err, "deploy iperf2 server deployment")
162
163 _, err = iperf2ServerService(ctx, f.ClientSet, f.Namespace.Name)
164 framework.ExpectNoError(err, "deploy iperf2 server service")
165
166
167
168 _, err = iperf2ClientDaemonSet(ctx, f.ClientSet, f.Namespace.Name)
169 framework.ExpectNoError(err, "deploy iperf2 client daemonset")
170
171
172 framework.Logf("waiting for iperf2 server endpoints")
173 err = wait.Poll(2*time.Second, largeClusterTimeout, func() (done bool, err error) {
174 listOptions := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serverServiceName)}
175 esList, err := f.ClientSet.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(ctx, listOptions)
176 framework.ExpectNoError(err, "Error fetching EndpointSlice for Service %s/%s", f.Namespace.Name, serverServiceName)
177
178 if len(esList.Items) == 0 {
179 framework.Logf("EndpointSlice for Service %s/%s not found", f.Namespace.Name, serverServiceName)
180 return false, nil
181 }
182 return true, nil
183 })
184 framework.ExpectNoError(err, "unable to wait for endpoints for the iperf service")
185 framework.Logf("found iperf2 server endpoints")
186
187 clientPodsListOptions := metav1.ListOptions{
188 LabelSelector: fmt.Sprintf("%s=%s", labelKey, clientLabelValue),
189 }
190
191 framework.Logf("waiting for client pods to be running")
192 var clientPodList *v1.PodList
193 err = wait.Poll(2*time.Second, largeClusterTimeout, func() (done bool, err error) {
194 clientPodList, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, clientPodsListOptions)
195 if err != nil {
196 return false, err
197 }
198 if len(clientPodList.Items) < len(readySchedulableNodes.Items) {
199 return false, nil
200 }
201 for _, pod := range clientPodList.Items {
202 if pod.Status.Phase != v1.PodRunning {
203 return false, nil
204 }
205 }
206 return true, nil
207 })
208 framework.ExpectNoError(err, "unable to wait for client pods to come up")
209 framework.Logf("all client pods are ready: %d pods", len(clientPodList.Items))
210
211
212 serverPodList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, serverPodsListOptions)
213 framework.ExpectNoError(err)
214 if len(serverPodList.Items) != 1 {
215 framework.Failf("expected 1 server pod, found %d", len(serverPodList.Items))
216 }
217 serverPod := serverPodList.Items[0]
218 framework.Logf("server pod phase %s", serverPod.Status.Phase)
219 for i, condition := range serverPod.Status.Conditions {
220 framework.Logf("server pod condition %d: %+v", i, condition)
221 }
222 for i, cont := range serverPod.Status.ContainerStatuses {
223 framework.Logf("server pod container status %d: %+v", i, cont)
224 }
225
226 framework.Logf("found %d matching client pods", len(clientPodList.Items))
227
228 nodeResults := &IPerf2NodeToNodeCSVResults{
229 ServerNode: serverPod.Spec.NodeName,
230 Results: map[string]*IPerf2EnhancedCSVResults{},
231 }
232
233
234
235 for _, pod := range clientPodList.Items {
236 podName := pod.Name
237 nodeName := pod.Spec.NodeName
238
239 iperfVersion := e2epod.ExecShellInPod(ctx, f, podName, "iperf -v || true")
240 framework.Logf("iperf version: %s", iperfVersion)
241
242 for try := 0; ; try++ {
243
250 command := fmt.Sprintf(`iperf %s -e -p %d --reportstyle C -i 1 -c %s && sleep 5`, familyStr, iperf2Port, serverServiceName)
251 framework.Logf("attempting to run command '%s' in client pod %s (node %s)", command, podName, nodeName)
252 output := e2epod.ExecShellInPod(ctx, f, podName, command)
253 framework.Logf("output from exec on client pod %s (node %s): \n%s\n", podName, nodeName, output)
254
255 results, err := ParseIPerf2EnhancedResultsFromCSV(output)
256 if err == nil {
257 nodeResults.Results[nodeName] = results
258 break
259 } else if try == 2 {
260 framework.ExpectNoError(err, "unable to parse iperf2 output from client pod %s (node %s)", pod.Name, nodeName)
261 } else {
262 framework.Logf("Retrying: IPerf run failed: %+v", err)
263 }
264 }
265 }
266
267
268
277 framework.Logf("%35s%35s%20s", "From", "To", "Bandwidth (MB/s)")
278 for nodeFrom, results := range nodeResults.Results {
279 framework.Logf("%35s%35s%20d", nodeFrom, nodeResults.ServerNode, results.Total.bandwidthMB())
280 }
281 for clientNode, results := range nodeResults.Results {
282 megabytesPerSecond := results.Total.bandwidthMB()
283 if megabytesPerSecond < iperf2BaselineBandwidthMegabytesPerSecond {
284 framework.Failf("iperf2 MB/s received below baseline of %d for client %s to server %s: %d", iperf2BaselineBandwidthMegabytesPerSecond, clientNode, nodeResults.ServerNode, megabytesPerSecond)
285 }
286 }
287 })
288 })
289
View as plain text