1
16
17 package network
18
19 import (
20 "context"
21 "fmt"
22 "slices"
23 "strings"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 discoveryv1 "k8s.io/api/discovery/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/util/intstr"
30 clientset "k8s.io/client-go/kubernetes"
31 "k8s.io/kubernetes/test/e2e/feature"
32 "k8s.io/kubernetes/test/e2e/framework"
33 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
34 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
35 e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
36 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
37 "k8s.io/kubernetes/test/e2e/network/common"
38 "k8s.io/kubernetes/test/utils/format"
39 admissionapi "k8s.io/pod-security-admission/api"
40
41 "github.com/onsi/ginkgo/v2"
42 "github.com/onsi/gomega"
43 )
44
45 var _ = common.SIGDescribe(feature.TrafficDistribution, func() {
46 f := framework.NewDefaultFramework("traffic-distribution")
47 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
48
49 var c clientset.Interface
50
51 ginkgo.BeforeEach(func(ctx context.Context) {
52 c = f.ClientSet
53 e2eskipper.SkipUnlessMultizone(ctx, c)
54 })
55
56
57
58
59
60
61
62
63 endpointSlicesForService := func(serviceName string) any {
64 return func(ctx context.Context) ([]discoveryv1.EndpointSlice, error) {
65 slices, err := c.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)})
66 if err != nil {
67 return nil, err
68 }
69 return slices.Items, nil
70 }
71 }
72
73
74
75 gomegaCustomError := func(format string, a ...any) func() string {
76 return func() string {
77 return fmt.Sprintf(format, a...)
78 }
79 }
80
81
82
83
84 endpointSlicesHaveSameZoneHints := framework.MakeMatcher(func(slices []discoveryv1.EndpointSlice) (func() string, error) {
85 if len(slices) == 0 {
86 return nil, fmt.Errorf("no endpointslices found")
87 }
88 for _, slice := range slices {
89 for _, endpoint := range slice.Endpoints {
90 var ip string
91 if len(endpoint.Addresses) > 0 {
92 ip = endpoint.Addresses[0]
93 }
94 var zone string
95 if endpoint.Zone != nil {
96 zone = *endpoint.Zone
97 }
98 if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone {
99 return gomegaCustomError("endpoint with ip %v does not have the correct hint, want hint for zone %q\nEndpointSlices=\n%v", ip, zone, format.Object(slices, 1 )), nil
100 }
101 }
102 }
103 return nil, nil
104 })
105
106
107
108
109 requestsFromClient := func(clientPod *v1.Pod) any {
110 return func(ctx context.Context) (reverseChronologicalLogLines []string, err error) {
111 logs, err := e2epod.GetPodLogs(ctx, c, f.Namespace.Name, clientPod.Name, clientPod.Spec.Containers[0].Name)
112 if err != nil {
113 return nil, err
114 }
115 logLines := strings.Split(logs, "\n")
116 slices.Reverse(logLines)
117 return logLines, nil
118 }
119 }
120
121
122
123
124
125 ginkgo.When("Service has trafficDistribution=PreferClose", func() {
126 ginkgo.It("should route traffic to an endpoint that is close to the client", func(ctx context.Context) {
127
128 ginkgo.By("finding 3 zones with schedulable nodes")
129 allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c)
130 framework.ExpectNoError(err)
131 if len(allZonesSet) < 3 {
132 framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet))
133 }
134 zones := allZonesSet.UnsortedList()[:3]
135
136 ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones))
137 nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
138 framework.ExpectNoError(err)
139 nodeForZone := make(map[string]string)
140 for _, zone := range zones {
141 found := false
142 for _, node := range nodeList.Items {
143 if zone == node.Labels[v1.LabelTopologyZone] {
144 found = true
145 nodeForZone[zone] = node.GetName()
146 }
147 }
148 if !found {
149 framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 ))
150 }
151 }
152
153 ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2]))
154 zoneForServingPod := make(map[string]string)
155 var servingPods []*v1.Pod
156 servingPodLabels := map[string]string{"app": f.UniqueName}
157 for _, zone := range zones[:2] {
158 pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname")
159 nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
160 e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
161 pod.Labels = servingPodLabels
162
163 servingPods = append(servingPods, pod)
164 zoneForServingPod[pod.Name] = zone
165 ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{})
166 }
167 e2epod.NewPodClient(f).CreateBatch(ctx, servingPods)
168
169 trafficDist := v1.ServiceTrafficDistributionPreferClose
170 svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
171 ObjectMeta: metav1.ObjectMeta{
172 Name: "traffic-dist-test-service",
173 },
174 Spec: v1.ServiceSpec{
175 Selector: servingPodLabels,
176 TrafficDistribution: &trafficDist,
177 Ports: []v1.ServicePort{{
178 Port: 80,
179 TargetPort: intstr.FromInt32(9376),
180 Protocol: v1.ProtocolTCP,
181 }},
182 },
183 })
184 ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution))
185 ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Services(f.Namespace.Name).Delete), svc.GetName(), metav1.DeleteOptions{})
186
187 ginkgo.By("ensuring EndpointSlice for service have correct same-zone hints")
188 gomega.Eventually(ctx, endpointSlicesForService(svc.GetName())).WithPolling(5 * time.Second).WithTimeout(e2eservice.ServiceEndpointsTimeout).Should(endpointSlicesHaveSameZoneHints)
189
190 ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone")
191
192 createClientPod := func(ctx context.Context, zone string) *v1.Pod {
193 pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil)
194 pod.Spec.NodeName = nodeForZone[zone]
195 nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
196 e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
197 cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
198 pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
199 pod.Spec.Containers[0].Name = pod.Name
200
201 ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{})
202 return e2epod.NewPodClient(f).CreateSync(ctx, pod)
203 }
204
205 for _, clientZone := range zones[:2] {
206 framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone)
207 clientPod := createClientPod(ctx, clientZone)
208
209 framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone)
210
211 requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
212 logLines := reverseChronologicalLogLines
213 if len(logLines) < 20 {
214 return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
215 }
216 consecutiveSameZone := 0
217
218 for _, logLine := range logLines {
219 if logLine == "" || strings.HasPrefix(logLine, "Date:") {
220 continue
221 }
222 destZone, ok := zoneForServingPod[logLine]
223 if !ok {
224 return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
225 }
226 if clientZone != destZone {
227 return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil
228 }
229 consecutiveSameZone++
230 if consecutiveSameZone >= 10 {
231 return nil, nil
232 }
233 }
234
235 return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
236 })
237
238 gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone)
239 }
240
241 ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client")
242
243 clientZone := zones[2]
244 framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone)
245 clientPod := createClientPod(ctx, clientZone)
246
247 framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone)
248
249 requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
250 logLines := reverseChronologicalLogLines
251 if len(logLines) < 20 {
252 return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
253 }
254
255
256
257 consecutiveSuccessfulRequests := 0
258
259 for _, logLine := range logLines {
260 if logLine == "" || strings.HasPrefix(logLine, "Date:") {
261 continue
262 }
263 _, servingPodExists := zoneForServingPod[logLine]
264 if !servingPodExists {
265 return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
266 }
267 consecutiveSuccessfulRequests++
268 if consecutiveSuccessfulRequests >= 10 {
269 return nil, nil
270 }
271 }
272
273 return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
274 })
275
276 gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod)
277
278 })
279
280 })
281 })
282
View as plain text