1
16
17 package network
18
19 import (
20 "context"
21 "fmt"
22 "strings"
23 "time"
24
25 "github.com/onsi/ginkgo/v2"
26 v1 "k8s.io/api/core/v1"
27 discoveryv1 "k8s.io/api/discovery/v1"
28 "k8s.io/apimachinery/pkg/api/resource"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/util/intstr"
31 "k8s.io/apimachinery/pkg/util/wait"
32 clientset "k8s.io/client-go/kubernetes"
33 "k8s.io/kubernetes/test/e2e/feature"
34 "k8s.io/kubernetes/test/e2e/framework"
35 e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
36 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
37 e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
38 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
39 "k8s.io/kubernetes/test/e2e/network/common"
40 imageutils "k8s.io/kubernetes/test/utils/image"
41 admissionapi "k8s.io/pod-security-admission/api"
42 )
43
44 var _ = common.SIGDescribe(feature.TopologyHints, func() {
45 f := framework.NewDefaultFramework("topology-hints")
46 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
47
48
49 var c clientset.Interface
50
51 ginkgo.BeforeEach(func(ctx context.Context) {
52 c = f.ClientSet
53 e2eskipper.SkipUnlessMultizone(ctx, c)
54 })
55
56 ginkgo.It("should distribute endpoints evenly", func(ctx context.Context) {
57 portNum := int32(9376)
58 thLabels := map[string]string{labelKey: clientLabelValue}
59 img := imageutils.GetE2EImage(imageutils.Agnhost)
60 ports := []v1.ContainerPort{{ContainerPort: int32(portNum)}}
61 dsConf := e2edaemonset.NewDaemonSet("topology-serve-hostname", img, thLabels, nil, nil, ports, "serve-hostname")
62 ds, err := c.AppsV1().DaemonSets(f.Namespace.Name).Create(ctx, dsConf, metav1.CreateOptions{})
63 framework.ExpectNoError(err, "error creating DaemonSet")
64
65 svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
66 ObjectMeta: metav1.ObjectMeta{
67 Name: "topology-hints",
68 Annotations: map[string]string{
69 v1.AnnotationTopologyMode: "Auto",
70 },
71 },
72 Spec: v1.ServiceSpec{
73 Selector: thLabels,
74 PublishNotReadyAddresses: true,
75 Ports: []v1.ServicePort{{
76 Name: "example",
77 Port: 80,
78 TargetPort: intstr.FromInt32(portNum),
79 Protocol: v1.ProtocolTCP,
80 }},
81 },
82 })
83
84 err = wait.PollWithContext(ctx, 5*time.Second, framework.PodStartTimeout, func(ctx context.Context) (bool, error) {
85 return e2edaemonset.CheckRunningOnAllNodes(ctx, f, ds)
86 })
87 framework.ExpectNoError(err, "timed out waiting for DaemonSets to be ready")
88
89
90 schedulableNodes := map[string]*v1.Node{}
91 for _, nodeName := range e2edaemonset.SchedulableNodes(ctx, c, ds) {
92 schedulableNodes[nodeName] = nil
93 }
94
95 nodeList, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
96 framework.ExpectNoError(err, "Error when listing all Nodes")
97 var lastNodeCPU resource.Quantity
98 firstNode := true
99 for i := range nodeList.Items {
100 node := nodeList.Items[i]
101 if _, ok := schedulableNodes[node.Name]; !ok {
102 continue
103 }
104 schedulableNodes[node.Name] = &node
105
106 nodeCPU, found := node.Status.Allocatable[v1.ResourceCPU]
107 if !found {
108 framework.Failf("Error when getting allocatable CPU of Node '%s'", node.Name)
109 }
110 if firstNode {
111 lastNodeCPU = nodeCPU
112 firstNode = false
113 } else if !nodeCPU.Equal(lastNodeCPU) {
114 e2eskipper.Skipf("Expected Nodes to have equivalent allocatable CPUs, but Node '%s' is different from the previous one. %d not equals %d",
115 node.Name, nodeCPU.Value(), lastNodeCPU.Value())
116 }
117 }
118
119 framework.Logf("Waiting for %d endpoints to be tracked in EndpointSlices", len(schedulableNodes))
120
121 var finalSlices []discoveryv1.EndpointSlice
122 err = wait.PollWithContext(ctx, 5*time.Second, 3*time.Minute, func(ctx context.Context) (bool, error) {
123 slices, listErr := c.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.Name)})
124 if listErr != nil {
125 return false, listErr
126 }
127
128 numEndpoints := 0
129 for _, slice := range slices.Items {
130 numEndpoints += len(slice.Endpoints)
131 }
132 if len(schedulableNodes) > numEndpoints {
133 framework.Logf("Expected %d endpoints, got %d", len(schedulableNodes), numEndpoints)
134 return false, nil
135 }
136
137 finalSlices = slices.Items
138 return true, nil
139 })
140 framework.ExpectNoError(err, "timed out waiting for EndpointSlices to be ready")
141
142 ginkgo.By("having hints set for each endpoint")
143 for _, slice := range finalSlices {
144 for _, ep := range slice.Endpoints {
145 if ep.Zone == nil {
146 framework.Failf("Expected endpoint in %s to have zone: %v", slice.Name, ep)
147 }
148 if ep.Hints == nil || len(ep.Hints.ForZones) == 0 {
149 framework.Failf("Expected endpoint in %s to have hints: %v", slice.Name, ep)
150 }
151 if len(ep.Hints.ForZones) > 1 {
152 framework.Failf("Expected endpoint in %s to have exactly 1 zone hint, got %d: %v", slice.Name, len(ep.Hints.ForZones), ep)
153 }
154 if *ep.Zone != ep.Hints.ForZones[0].Name {
155 framework.Failf("Expected endpoint in %s to have same zone hint, got %s: %v", slice.Name, *ep.Zone, ep)
156 }
157 }
158 }
159
160 nodesByZone := map[string]string{}
161 zonesWithNode := map[string]string{}
162 for _, node := range schedulableNodes {
163 if zone, ok := node.Labels[v1.LabelTopologyZone]; ok {
164 nodesByZone[node.Name] = zone
165 zonesWithNode[zone] = node.Name
166 }
167 }
168
169 podList, err := c.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{})
170 framework.ExpectNoError(err)
171 podsByZone := map[string]string{}
172 for _, pod := range podList.Items {
173 if zone, ok := nodesByZone[pod.Spec.NodeName]; ok {
174 podsByZone[pod.Name] = zone
175 }
176 }
177
178 ginkgo.By("keeping requests in the same zone")
179 for fromZone, nodeName := range zonesWithNode {
180 ginkgo.By("creating a client pod for probing the service from " + fromZone)
181 podName := "curl-from-" + fromZone
182 clientPod := e2epod.NewAgnhostPod(f.Namespace.Name, podName, nil, nil, nil, "serve-hostname")
183 nodeSelection := e2epod.NodeSelection{Name: nodeName}
184 e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection)
185 cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
186 clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
187 clientPod.Spec.Containers[0].Name = clientPod.Name
188 e2epod.NewPodClient(f).CreateSync(ctx, clientPod)
189
190 framework.Logf("Ensuring that requests from %s pod on %s node stay in %s zone", clientPod.Name, nodeName, fromZone)
191
192 var logs string
193 if pollErr := wait.PollWithContext(ctx, 5*time.Second, e2eservice.KubeProxyLagTimeout, func(ctx context.Context) (bool, error) {
194 var err error
195 logs, err = e2epod.GetPodLogs(ctx, c, f.Namespace.Name, clientPod.Name, clientPod.Name)
196 framework.ExpectNoError(err)
197 framework.Logf("Pod client logs: %s", logs)
198
199 logLines := strings.Split(logs, "\n")
200 if len(logLines) < 6 {
201 framework.Logf("only %d log lines, waiting for at least 6", len(logLines))
202 return false, nil
203 }
204
205 consecutiveSameZone := 0
206
207 for i := len(logLines) - 1; i > 0; i-- {
208 if logLines[i] == "" || strings.HasPrefix(logLines[i], "Date:") {
209 continue
210 }
211 destZone, ok := podsByZone[logLines[i]]
212 if !ok {
213 framework.Logf("could not determine dest zone from log line: %s", logLines[i])
214 return false, nil
215 }
216 if fromZone != destZone {
217 framework.Logf("expected request from %s to stay in %s zone, delivered to %s zone", clientPod.Name, fromZone, destZone)
218 return false, nil
219 }
220 consecutiveSameZone++
221 if consecutiveSameZone >= 5 {
222 return true, nil
223 }
224 }
225
226 return false, nil
227 }); pollErr != nil {
228 framework.Failf("expected 5 consecutive requests from %s to stay in zone %s within %v, stdout: %v", clientPod.Name, fromZone, e2eservice.KubeProxyLagTimeout, logs)
229 }
230 }
231 })
232 })
233
View as plain text