1
16
17 package service
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "testing"
24 "time"
25
26 corev1 "k8s.io/api/core/v1"
27 discoveryv1 "k8s.io/api/discovery/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/util/intstr"
30 "k8s.io/apimachinery/pkg/util/wait"
31 utilfeature "k8s.io/apiserver/pkg/util/feature"
32 "k8s.io/client-go/informers"
33 clientset "k8s.io/client-go/kubernetes"
34 featuregatetesting "k8s.io/component-base/featuregate/testing"
35 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
36 "k8s.io/kubernetes/pkg/controller/endpointslice"
37 "k8s.io/kubernetes/pkg/features"
38 "k8s.io/kubernetes/test/integration/framework"
39 "k8s.io/kubernetes/test/utils/format"
40 "k8s.io/kubernetes/test/utils/ktesting"
41 )
42
43
44
45
46
47 func Test_ExternalNameServiceStopsDefaultingInternalTrafficPolicy(t *testing.T) {
48 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
49 defer server.TearDownFn()
50
51 client, err := clientset.NewForConfig(server.ClientConfig)
52 if err != nil {
53 t.Fatalf("Error creating clientset: %v", err)
54 }
55
56 ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t)
57 defer framework.DeleteNamespaceOrDie(client, ns, t)
58
59 service := &corev1.Service{
60 ObjectMeta: metav1.ObjectMeta{
61 Name: "test-123",
62 },
63 Spec: corev1.ServiceSpec{
64 Type: corev1.ServiceTypeExternalName,
65 ExternalName: "foo.bar.com",
66 },
67 }
68
69 service, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{})
70 if err != nil {
71 t.Fatalf("Error creating test service: %v", err)
72 }
73
74 if service.Spec.InternalTrafficPolicy != nil {
75 t.Errorf("service internalTrafficPolicy should be droppped but is set: %v", service.Spec.InternalTrafficPolicy)
76 }
77
78 service, err = client.CoreV1().Services(ns.Name).Get(context.TODO(), service.Name, metav1.GetOptions{})
79 if err != nil {
80 t.Fatalf("error getting service: %v", err)
81 }
82
83 if service.Spec.InternalTrafficPolicy != nil {
84 t.Errorf("service internalTrafficPolicy should be droppped but is set: %v", service.Spec.InternalTrafficPolicy)
85 }
86 }
87
88
89
90
91 func Test_ExternalNameServiceDropsInternalTrafficPolicy(t *testing.T) {
92 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
93 defer server.TearDownFn()
94
95 client, err := clientset.NewForConfig(server.ClientConfig)
96 if err != nil {
97 t.Fatalf("Error creating clientset: %v", err)
98 }
99
100 ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t)
101 defer framework.DeleteNamespaceOrDie(client, ns, t)
102
103 internalTrafficPolicy := corev1.ServiceInternalTrafficPolicyCluster
104 service := &corev1.Service{
105 ObjectMeta: metav1.ObjectMeta{
106 Name: "test-123",
107 },
108 Spec: corev1.ServiceSpec{
109 Type: corev1.ServiceTypeExternalName,
110 ExternalName: "foo.bar.com",
111 InternalTrafficPolicy: &internalTrafficPolicy,
112 },
113 }
114
115 service, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{})
116 if err != nil {
117 t.Fatalf("Error creating test service: %v", err)
118 }
119
120 if service.Spec.InternalTrafficPolicy != nil {
121 t.Errorf("service internalTrafficPolicy should be droppped but is set: %v", service.Spec.InternalTrafficPolicy)
122 }
123
124 service, err = client.CoreV1().Services(ns.Name).Get(context.TODO(), service.Name, metav1.GetOptions{})
125 if err != nil {
126 t.Fatalf("error getting service: %v", err)
127 }
128
129 if service.Spec.InternalTrafficPolicy != nil {
130 t.Errorf("service internalTrafficPolicy should be droppped but is set: %v", service.Spec.InternalTrafficPolicy)
131 }
132 }
133
134
135
136
137
138 func Test_ConvertingToExternalNameServiceDropsInternalTrafficPolicy(t *testing.T) {
139 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
140 defer server.TearDownFn()
141
142 client, err := clientset.NewForConfig(server.ClientConfig)
143 if err != nil {
144 t.Fatalf("Error creating clientset: %v", err)
145 }
146
147 ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t)
148 defer framework.DeleteNamespaceOrDie(client, ns, t)
149
150 service := &corev1.Service{
151 ObjectMeta: metav1.ObjectMeta{
152 Name: "test-123",
153 },
154 Spec: corev1.ServiceSpec{
155 Type: corev1.ServiceTypeClusterIP,
156 Ports: []corev1.ServicePort{{
157 Port: int32(80),
158 }},
159 Selector: map[string]string{
160 "foo": "bar",
161 },
162 },
163 }
164
165 service, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{})
166 if err != nil {
167 t.Fatalf("Error creating test service: %v", err)
168 }
169
170 if *service.Spec.InternalTrafficPolicy != corev1.ServiceInternalTrafficPolicyCluster {
171 t.Error("service internalTrafficPolicy was not set for clusterIP Service")
172 }
173
174 newService := service.DeepCopy()
175 newService.Spec.Type = corev1.ServiceTypeExternalName
176 newService.Spec.ExternalName = "foo.bar.com"
177
178 service, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), newService, metav1.UpdateOptions{})
179 if err != nil {
180 t.Fatalf("error updating service: %v", err)
181 }
182
183 if service.Spec.InternalTrafficPolicy != nil {
184 t.Errorf("service internalTrafficPolicy should be droppped but is set: %v", service.Spec.InternalTrafficPolicy)
185 }
186
187 service, err = client.CoreV1().Services(ns.Name).Get(context.TODO(), service.Name, metav1.GetOptions{})
188 if err != nil {
189 t.Fatalf("error getting service: %v", err)
190 }
191
192 if service.Spec.InternalTrafficPolicy != nil {
193 t.Errorf("service internalTrafficPolicy should be droppped but is set: %v", service.Spec.InternalTrafficPolicy)
194 }
195 }
196
197
198
199 func Test_RemovingExternalIPsFromClusterIPServiceDropsExternalTrafficPolicy(t *testing.T) {
200 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
201 defer server.TearDownFn()
202
203 client, err := clientset.NewForConfig(server.ClientConfig)
204 if err != nil {
205 t.Fatalf("Error creating clientset: %v", err)
206 }
207
208 ns := framework.CreateNamespaceOrDie(client, "test-removing-external-ips-drops-external-traffic-policy", t)
209 defer framework.DeleteNamespaceOrDie(client, ns, t)
210
211 service := &corev1.Service{
212 ObjectMeta: metav1.ObjectMeta{
213 Name: "test-123",
214 },
215 Spec: corev1.ServiceSpec{
216 Type: corev1.ServiceTypeClusterIP,
217 Ports: []corev1.ServicePort{{
218 Port: int32(80),
219 }},
220 Selector: map[string]string{
221 "foo": "bar",
222 },
223 ExternalIPs: []string{"1.1.1.1"},
224 },
225 }
226
227 service, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{})
228 if err != nil {
229 t.Fatalf("Error creating test service: %v", err)
230 }
231
232 if service.Spec.ExternalTrafficPolicy != corev1.ServiceExternalTrafficPolicyCluster {
233 t.Error("service externalTrafficPolicy was not set for clusterIP Service with externalIPs")
234 }
235
236
237 newService := service.DeepCopy()
238 newService.Spec.ExternalIPs = []string{}
239
240 service, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), newService, metav1.UpdateOptions{})
241 if err != nil {
242 t.Fatalf("error updating service: %v", err)
243 }
244
245 if service.Spec.ExternalTrafficPolicy != "" {
246 t.Errorf("service externalTrafficPolicy should be droppped but is set: %v", service.Spec.ExternalTrafficPolicy)
247 }
248
249 service, err = client.CoreV1().Services(ns.Name).Get(context.TODO(), service.Name, metav1.GetOptions{})
250 if err != nil {
251 t.Fatalf("error getting service: %v", err)
252 }
253
254 if service.Spec.ExternalTrafficPolicy != "" {
255 t.Errorf("service externalTrafficPolicy should be droppped but is set: %v", service.Spec.ExternalTrafficPolicy)
256 }
257
258
259 newService = service.DeepCopy()
260 newService.Spec.ExternalIPs = []string{"1.1.1.1"}
261
262 service, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), newService, metav1.UpdateOptions{})
263 if err != nil {
264 t.Fatalf("error updating service: %v", err)
265 }
266
267 if service.Spec.ExternalTrafficPolicy != corev1.ServiceExternalTrafficPolicyCluster {
268 t.Error("service externalTrafficPolicy was not set for clusterIP Service with externalIPs")
269 }
270
271 service, err = client.CoreV1().Services(ns.Name).Get(context.TODO(), service.Name, metav1.GetOptions{})
272 if err != nil {
273 t.Fatalf("error getting service: %v", err)
274 }
275
276 if service.Spec.ExternalTrafficPolicy != corev1.ServiceExternalTrafficPolicyCluster {
277 t.Error("service externalTrafficPolicy was not set for clusterIP Service with externalIPs")
278 }
279 }
280
281
282 func Test_TransitionsForTrafficDistribution(t *testing.T) {
283
284
285
286
287
288 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceTrafficDistribution, true)()
289
290
291 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
292 defer server.TearDownFn()
293
294 client, err := clientset.NewForConfig(server.ClientConfig)
295 if err != nil {
296 t.Fatalf("Error creating clientset: %v", err)
297 }
298
299 resyncPeriod := 12 * time.Hour
300 informers := informers.NewSharedInformerFactory(client, resyncPeriod)
301
302 ctx := ktesting.Init(t)
303 defer ctx.Cancel("test has completed")
304 epsController := endpointslice.NewController(
305 ctx,
306 informers.Core().V1().Pods(),
307 informers.Core().V1().Services(),
308 informers.Core().V1().Nodes(),
309 informers.Discovery().V1().EndpointSlices(),
310 int32(100),
311 client,
312 1*time.Second,
313 )
314
315 informers.Start(ctx.Done())
316 go epsController.Run(ctx, 1)
317
318
319
320
321
322 ns := framework.CreateNamespaceOrDie(client, "test-service-traffic-distribution", t)
323 defer framework.DeleteNamespaceOrDie(client, ns, t)
324
325 node := &corev1.Node{
326 ObjectMeta: metav1.ObjectMeta{
327 Name: "fake-node",
328 Labels: map[string]string{
329 corev1.LabelTopologyZone: "fake-zone-1",
330 },
331 },
332 }
333
334 pod := &corev1.Pod{
335 ObjectMeta: metav1.ObjectMeta{
336 Name: "test-pod",
337 Namespace: ns.GetName(),
338 Labels: map[string]string{
339 "foo": "bar",
340 },
341 },
342 Spec: corev1.PodSpec{
343 NodeName: node.GetName(),
344 Containers: []corev1.Container{
345 {
346 Name: "fake-name",
347 Image: "fake-image",
348 Ports: []corev1.ContainerPort{
349 {
350 Name: "port-443",
351 ContainerPort: 443,
352 },
353 },
354 },
355 },
356 },
357 Status: corev1.PodStatus{
358 Phase: corev1.PodRunning,
359 Conditions: []corev1.PodCondition{
360 {
361 Type: corev1.PodReady,
362 Status: corev1.ConditionTrue,
363 },
364 },
365 PodIP: "10.0.0.1",
366 PodIPs: []corev1.PodIP{
367 {
368 IP: "10.0.0.1",
369 },
370 },
371 },
372 }
373
374 svc := &corev1.Service{
375 ObjectMeta: metav1.ObjectMeta{
376 Name: "test-service",
377 Namespace: ns.GetName(),
378 },
379 Spec: corev1.ServiceSpec{
380 Selector: map[string]string{
381 "foo": "bar",
382 },
383 Ports: []corev1.ServicePort{
384 {Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)},
385 },
386 },
387 }
388
389 _, err = client.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
390 if err != nil {
391 t.Fatalf("Failed to create test node: %v", err)
392 }
393 _, err = client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{})
394 if err != nil {
395 t.Fatalf("Failed to create test ready pod: %v", err)
396 }
397 _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, pod, metav1.UpdateOptions{})
398 if err != nil {
399 t.Fatalf("Failed to update status for test pod to Ready: %v", err)
400 }
401 _, err = client.CoreV1().Services(ns.Name).Create(ctx, svc, metav1.CreateOptions{})
402 if err != nil {
403 t.Fatalf("Failed to create test service: %v", err)
404 }
405
406
407
408
409
410
411
412
413
414 logsBuffer := &bytes.Buffer{}
415
416 endpointSlicesHaveNoHints := func(ctx context.Context) (bool, error) {
417 slices, err := client.DiscoveryV1().EndpointSlices(ns.GetName()).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.GetName())})
418 if err != nil {
419 fmt.Fprintf(logsBuffer, "failed to list EndpointSlices for service %q: %v\n", svc.GetName(), err)
420 return false, nil
421 }
422 if slices == nil || len(slices.Items) == 0 {
423 fmt.Fprintf(logsBuffer, "no EndpointSlices returned for service %q\n", svc.GetName())
424 return false, nil
425 }
426 fmt.Fprintf(logsBuffer, "EndpointSlices=\n%v\n", format.Object(slices, 1 ))
427
428 for _, slice := range slices.Items {
429 for _, endpoint := range slice.Endpoints {
430 var ip string
431 if len(endpoint.Addresses) > 0 {
432 ip = endpoint.Addresses[0]
433 }
434 if endpoint.Hints != nil && len(endpoint.Hints.ForZones) != 0 {
435 fmt.Fprintf(logsBuffer, "endpoint with ip %v has hint %+v, want no hint\n", ip, endpoint.Hints)
436 return false, nil
437 }
438 }
439 }
440 return true, nil
441 }
442
443 err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints)
444 if err != nil {
445 t.Logf("logsBuffer=\n%v", logsBuffer)
446 t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err)
447 }
448 logsBuffer.Reset()
449
450
451
452
453
454
455
456 trafficDist := corev1.ServiceTrafficDistributionPreferClose
457 svc.Spec.TrafficDistribution = &trafficDist
458 _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{})
459 if err != nil {
460 t.Fatalf("Failed to update test service with 'trafficDistribution: PreferLocal': %v", err)
461 }
462
463 endpointSlicesHaveSameZoneHints := func(ctx context.Context) (bool, error) {
464 slices, err := client.DiscoveryV1().EndpointSlices(ns.GetName()).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.GetName())})
465 if err != nil {
466 fmt.Fprintf(logsBuffer, "failed to list EndpointSlices for service %q: %v\n", svc.GetName(), err)
467 return false, nil
468 }
469 if slices == nil || len(slices.Items) == 0 {
470 fmt.Fprintf(logsBuffer, "no EndpointSlices returned for service %q\n", svc.GetName())
471 return false, nil
472 }
473 fmt.Fprintf(logsBuffer, "EndpointSlices=\n%v\n", format.Object(slices, 1 ))
474
475 for _, slice := range slices.Items {
476 for _, endpoint := range slice.Endpoints {
477 var ip string
478 if len(endpoint.Addresses) > 0 {
479 ip = endpoint.Addresses[0]
480 }
481 var zone string
482 if endpoint.Zone != nil {
483 zone = *endpoint.Zone
484 }
485 if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone {
486 fmt.Fprintf(logsBuffer, "endpoint with ip %v does not have the correct hint, want hint for zone %q\n", ip, zone)
487 return false, nil
488 }
489 }
490 }
491 return true, nil
492 }
493
494 err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveSameZoneHints)
495 if err != nil {
496 t.Logf("logsBuffer=\n%v", logsBuffer)
497 t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err)
498 }
499 logsBuffer.Reset()
500
501
502
503
504
505
506
507
508
509 svc.Annotations = map[string]string{corev1.AnnotationTopologyMode: "Auto"}
510 _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{})
511 if err != nil {
512 t.Fatalf("Failed to update test service with 'service.kubernetes.io/topology-mode=Auto' annotation: %v", err)
513 }
514
515 err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints)
516 if err != nil {
517 t.Logf("logsBuffer=\n%v", logsBuffer)
518 t.Fatalf("Error waiting for EndpointSlices to have no hints: %v", err)
519 }
520 logsBuffer.Reset()
521
522
523
524
525
526
527
528
529 svc.Annotations = map[string]string{}
530 _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{})
531 if err != nil {
532 t.Fatalf("Failed to remove annotation 'service.kubernetes.io/topology-mode=Auto' from service: %v", err)
533 }
534
535 err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveSameZoneHints)
536 if err != nil {
537 t.Logf("logsBuffer=\n%v", logsBuffer)
538 t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err)
539 }
540 logsBuffer.Reset()
541
542
543
544
545
546
547 svc.Spec.TrafficDistribution = nil
548 _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{})
549 if err != nil {
550 t.Fatalf("Failed to remove annotation 'service.kubernetes.io/topology-mode=Auto' from service: %v", err)
551 }
552
553 err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints)
554 if err != nil {
555 t.Logf("logsBuffer=\n%v", logsBuffer)
556 t.Fatalf("Error waiting for EndpointSlices to have no hints: %v", err)
557 }
558 logsBuffer.Reset()
559 }
560
View as plain text