1
16
17 package network
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "net"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 discoveryv1 "k8s.io/api/discovery/v1"
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 types "k8s.io/apimachinery/pkg/types"
31 "k8s.io/apimachinery/pkg/util/intstr"
32 "k8s.io/apimachinery/pkg/util/sets"
33 "k8s.io/apimachinery/pkg/util/wait"
34 "k8s.io/apimachinery/pkg/watch"
35 clientset "k8s.io/client-go/kubernetes"
36 "k8s.io/client-go/util/retry"
37 "k8s.io/kubernetes/test/e2e/framework"
38 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
39 "k8s.io/kubernetes/test/e2e/network/common"
40 imageutils "k8s.io/kubernetes/test/utils/image"
41 admissionapi "k8s.io/pod-security-admission/api"
42 "k8s.io/utils/pointer"
43
44 "github.com/onsi/ginkgo/v2"
45 "github.com/onsi/gomega"
46 )
47
48 var _ = common.SIGDescribe("EndpointSlice", func() {
49 f := framework.NewDefaultFramework("endpointslice")
50 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
51
52 var cs clientset.Interface
53 var podClient *e2epod.PodClient
54
55 ginkgo.BeforeEach(func() {
56 cs = f.ClientSet
57 podClient = e2epod.NewPodClient(f)
58 })
59
60
69 framework.ConformanceIt("should have Endpoints and EndpointSlices pointing to API Server", func(ctx context.Context) {
70 namespace := "default"
71 name := "kubernetes"
72
73 _, err := cs.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
74 framework.ExpectNoError(err, "error obtaining API server \"kubernetes\" Service resource on \"default\" namespace")
75
76
77 endpoints, err := cs.CoreV1().Endpoints(namespace).Get(ctx, name, metav1.GetOptions{})
78 framework.ExpectNoError(err, "error obtaining API server \"kubernetes\" Endpoint resource on \"default\" namespace")
79 if len(endpoints.Subsets) == 0 {
80 framework.Failf("Expected at least 1 subset in endpoints, got %d: %#v", len(endpoints.Subsets), endpoints.Subsets)
81 }
82
83 endpointSliceList, err := cs.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{
84 LabelSelector: "kubernetes.io/service-name=" + name,
85 })
86 framework.ExpectNoError(err, "error obtaining API server \"kubernetes\" EndpointSlice resource on \"default\" namespace")
87 if len(endpointSliceList.Items) == 0 {
88 framework.Failf("Expected at least 1 EndpointSlice, got %d: %#v", len(endpoints.Subsets), endpoints.Subsets)
89 }
90
91 if !endpointSlicesEqual(endpoints, endpointSliceList) {
92 framework.Failf("Expected EndpointSlice to have same addresses and port as Endpoints, got %#v: %#v", endpoints, endpointSliceList)
93 }
94
95 })
96
97
105 framework.ConformanceIt("should create and delete Endpoints and EndpointSlices for a Service with a selector specified", func(ctx context.Context) {
106 svc := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{
107 ObjectMeta: metav1.ObjectMeta{
108 Name: "example-empty-selector",
109 },
110 Spec: v1.ServiceSpec{
111 Selector: map[string]string{
112 "does-not-match-anything": "endpoints-and-endpoint-slices-should-still-be-created",
113 },
114 Ports: []v1.ServicePort{{
115 Name: "example",
116 Port: 80,
117 Protocol: v1.ProtocolTCP,
118 }},
119 },
120 })
121
122
123 if err := wait.PollImmediate(2*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
124 _, err := cs.CoreV1().Endpoints(svc.Namespace).Get(ctx, svc.Name, metav1.GetOptions{})
125 if err != nil {
126 return false, nil
127 }
128 return true, nil
129 }); err != nil {
130 framework.Failf("No Endpoints found for Service %s/%s: %s", svc.Namespace, svc.Name, err)
131 }
132
133
134 var endpointSlice discoveryv1.EndpointSlice
135 if err := wait.PollImmediate(2*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
136 endpointSliceList, err := cs.DiscoveryV1().EndpointSlices(svc.Namespace).List(ctx, metav1.ListOptions{
137 LabelSelector: "kubernetes.io/service-name=" + svc.Name,
138 })
139 if err != nil {
140 return false, err
141 }
142 if len(endpointSliceList.Items) == 0 {
143 return false, nil
144 }
145 endpointSlice = endpointSliceList.Items[0]
146 return true, nil
147 }); err != nil {
148 framework.Failf("No EndpointSlice found for Service %s/%s: %s", svc.Namespace, svc.Name, err)
149 }
150
151
152 managedBy, ok := endpointSlice.Labels[discoveryv1.LabelManagedBy]
153 expectedManagedBy := "endpointslice-controller.k8s.io"
154 if !ok {
155 framework.Failf("Expected EndpointSlice to have %s label, got %#v", discoveryv1.LabelManagedBy, endpointSlice.Labels)
156 } else if managedBy != expectedManagedBy {
157 framework.Failf("Expected EndpointSlice to have %s label with %s value, got %s", discoveryv1.LabelManagedBy, expectedManagedBy, managedBy)
158 }
159 if len(endpointSlice.Endpoints) != 0 {
160 framework.Failf("Expected EndpointSlice to have 0 endpoints, got %d: %#v", len(endpointSlice.Endpoints), endpointSlice.Endpoints)
161 }
162
163 err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
164 framework.ExpectNoError(err, "error deleting Service")
165
166
167 if err := wait.PollImmediate(2*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
168 _, err := cs.CoreV1().Endpoints(svc.Namespace).Get(ctx, svc.Name, metav1.GetOptions{})
169 if err != nil {
170 if apierrors.IsNotFound(err) {
171 return true, nil
172 }
173 return false, err
174 }
175 return false, nil
176 }); err != nil {
177 framework.Failf("Endpoints resource not deleted after Service %s/%s was deleted: %s", svc.Namespace, svc.Name, err)
178 }
179
180
181
182
183
184 if err := wait.PollImmediate(2*time.Second, 90*time.Second, func() (bool, error) {
185 endpointSliceList, err := cs.DiscoveryV1().EndpointSlices(svc.Namespace).List(ctx, metav1.ListOptions{
186 LabelSelector: "kubernetes.io/service-name=" + svc.Name,
187 })
188 if err != nil {
189 return false, err
190 }
191 if len(endpointSliceList.Items) == 0 {
192 return true, nil
193 }
194 return false, nil
195 }); err != nil {
196 framework.Failf("EndpointSlice resource not deleted after Service %s/%s was deleted: %s", svc.Namespace, svc.Name, err)
197 }
198 })
199
200
208 framework.ConformanceIt("should create Endpoints and EndpointSlices for Pods matching a Service", func(ctx context.Context) {
209 labelPod1 := "pod1"
210 labelPod2 := "pod2"
211 labelPod3 := "pod3"
212 labelShared12 := "shared12"
213 labelValue := "on"
214
215 pod1 := podClient.Create(ctx, &v1.Pod{
216 ObjectMeta: metav1.ObjectMeta{
217 Name: "pod1",
218 Labels: map[string]string{
219 labelPod1: labelValue,
220 labelShared12: labelValue,
221 },
222 },
223 Spec: v1.PodSpec{
224 Containers: []v1.Container{
225 {
226 Name: "container1",
227 Image: imageutils.GetE2EImage(imageutils.Nginx),
228 Ports: []v1.ContainerPort{{
229 Name: "example-name",
230 ContainerPort: int32(3000),
231 }},
232 },
233 },
234 },
235 })
236
237 pod2 := podClient.Create(ctx, &v1.Pod{
238 ObjectMeta: metav1.ObjectMeta{
239 Name: "pod2",
240 Labels: map[string]string{
241 labelPod2: labelValue,
242 labelShared12: labelValue,
243 },
244 },
245 Spec: v1.PodSpec{
246 Containers: []v1.Container{
247 {
248 Name: "container1",
249 Image: imageutils.GetE2EImage(imageutils.Nginx),
250 Ports: []v1.ContainerPort{{
251 Name: "example-name",
252 ContainerPort: int32(3001),
253 }, {
254 Name: "other-port",
255 ContainerPort: int32(3002),
256 }},
257 },
258 },
259 },
260 })
261
262 svc1 := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{
263 ObjectMeta: metav1.ObjectMeta{
264 Name: "example-int-port",
265 },
266 Spec: v1.ServiceSpec{
267 Selector: map[string]string{labelPod1: labelValue},
268 PublishNotReadyAddresses: true,
269 Ports: []v1.ServicePort{{
270 Name: "example",
271 Port: 80,
272 TargetPort: intstr.FromInt32(3000),
273 Protocol: v1.ProtocolTCP,
274 }},
275 },
276 })
277
278 svc2 := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{
279 ObjectMeta: metav1.ObjectMeta{
280 Name: "example-named-port",
281 },
282 Spec: v1.ServiceSpec{
283 Selector: map[string]string{labelShared12: labelValue},
284 PublishNotReadyAddresses: true,
285 Ports: []v1.ServicePort{{
286 Name: "http",
287 Port: 80,
288 TargetPort: intstr.FromString("example-name"),
289 Protocol: v1.ProtocolTCP,
290 }},
291 },
292 })
293
294 svc3 := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{
295 ObjectMeta: metav1.ObjectMeta{
296 Name: "example-no-match",
297 },
298 Spec: v1.ServiceSpec{
299 Selector: map[string]string{labelPod3: labelValue},
300 PublishNotReadyAddresses: true,
301 Ports: []v1.ServicePort{{
302 Name: "example-no-match",
303 Port: 80,
304 TargetPort: intstr.FromInt32(8080),
305 Protocol: v1.ProtocolTCP,
306 }},
307 },
308 })
309
310 err := wait.PollUntilContextTimeout(ctx, 2*time.Second, 3*time.Minute, true, func(ctx context.Context) (bool, error) {
311 var err error
312 pod1, err = podClient.Get(ctx, pod1.Name, metav1.GetOptions{})
313 if err != nil {
314 return false, err
315 }
316 if len(pod1.Status.PodIPs) == 0 {
317 return false, nil
318 }
319
320 pod2, err = podClient.Get(ctx, pod2.Name, metav1.GetOptions{})
321 if err != nil {
322 return false, err
323 }
324 if len(pod2.Status.PodIPs) == 0 {
325 return false, nil
326 }
327
328 return true, nil
329 })
330 framework.ExpectNoError(err, "timed out waiting for Pods to have IPs assigned")
331
332 ginkgo.By("referencing a single matching pod")
333 expectEndpointsAndSlices(ctx, cs, f.Namespace.Name, svc1, []*v1.Pod{pod1}, 1, 1, false)
334
335 ginkgo.By("referencing matching pods with named port")
336 expectEndpointsAndSlices(ctx, cs, f.Namespace.Name, svc2, []*v1.Pod{pod1, pod2}, 2, 2, true)
337
338 ginkgo.By("creating empty Endpoints and EndpointSlices for no matching Pods")
339 expectEndpointsAndSlices(ctx, cs, f.Namespace.Name, svc3, []*v1.Pod{}, 0, 1, false)
340
341
342
343 ginkgo.By("recreating EndpointSlices after they've been deleted")
344 deleteEndpointSlices(ctx, cs, f.Namespace.Name, svc2)
345 expectEndpointsAndSlices(ctx, cs, f.Namespace.Name, svc2, []*v1.Pod{pod1, pod2}, 2, 2, true)
346 })
347
348
356 framework.ConformanceIt("should support creating EndpointSlice API operations", func(ctx context.Context) {
357
358 ns := f.Namespace.Name
359 epsVersion := "v1"
360 epsClient := f.ClientSet.DiscoveryV1().EndpointSlices(ns)
361
362 epsTemplate := &discoveryv1.EndpointSlice{
363 ObjectMeta: metav1.ObjectMeta{GenerateName: "e2e-example-ing",
364 Labels: map[string]string{
365 "special-label": f.UniqueName,
366 }},
367 AddressType: discoveryv1.AddressTypeIPv4,
368 Endpoints: []discoveryv1.Endpoint{
369 {Addresses: []string{"1.2.3.4", "5.6.7.8"}},
370 {Addresses: []string{"2.2.3.4", "6.6.7.8"}},
371 },
372 }
373
374 ginkgo.By("getting /apis")
375 {
376 discoveryGroups, err := f.ClientSet.Discovery().ServerGroups()
377 framework.ExpectNoError(err)
378 found := false
379 for _, group := range discoveryGroups.Groups {
380 if group.Name == discoveryv1.GroupName {
381 for _, version := range group.Versions {
382 if version.Version == epsVersion {
383 found = true
384 break
385 }
386 }
387 }
388 }
389 if !found {
390 framework.Failf("expected discovery API group/version, got %#v", discoveryGroups.Groups)
391 }
392 }
393
394 ginkgo.By("getting /apis/discovery.k8s.io")
395 {
396 group := &metav1.APIGroup{}
397 err := f.ClientSet.Discovery().RESTClient().Get().AbsPath("/apis/discovery.k8s.io").Do(ctx).Into(group)
398 framework.ExpectNoError(err)
399 found := false
400 for _, version := range group.Versions {
401 if version.Version == epsVersion {
402 found = true
403 break
404 }
405 }
406 if !found {
407 framework.Failf("expected discovery API version, got %#v", group.Versions)
408 }
409 }
410
411 ginkgo.By("getting /apis/discovery.k8s.io" + epsVersion)
412 {
413 resources, err := f.ClientSet.Discovery().ServerResourcesForGroupVersion(discoveryv1.SchemeGroupVersion.String())
414 framework.ExpectNoError(err)
415 foundEPS := false
416 for _, resource := range resources.APIResources {
417 switch resource.Name {
418 case "endpointslices":
419 foundEPS = true
420 }
421 }
422 if !foundEPS {
423 framework.Failf("expected endpointslices, got %#v", resources.APIResources)
424 }
425 }
426
427
428 ginkgo.By("creating")
429 _, err := epsClient.Create(ctx, epsTemplate, metav1.CreateOptions{})
430 framework.ExpectNoError(err)
431 _, err = epsClient.Create(ctx, epsTemplate, metav1.CreateOptions{})
432 framework.ExpectNoError(err)
433 createdEPS, err := epsClient.Create(ctx, epsTemplate, metav1.CreateOptions{})
434 framework.ExpectNoError(err)
435
436 ginkgo.By("getting")
437 queriedEPS, err := epsClient.Get(ctx, createdEPS.Name, metav1.GetOptions{})
438 framework.ExpectNoError(err)
439 gomega.Expect(queriedEPS.UID).To(gomega.Equal(createdEPS.UID))
440
441 ginkgo.By("listing")
442 epsList, err := epsClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
443 framework.ExpectNoError(err)
444 gomega.Expect(epsList.Items).To(gomega.HaveLen(3), "filtered list should have 3 items")
445
446 ginkgo.By("watching")
447 framework.Logf("starting watch")
448 epsWatch, err := epsClient.Watch(ctx, metav1.ListOptions{ResourceVersion: epsList.ResourceVersion, LabelSelector: "special-label=" + f.UniqueName})
449 framework.ExpectNoError(err)
450
451
452 clusterEPSClient := f.ClientSet.DiscoveryV1().EndpointSlices("")
453 ginkgo.By("cluster-wide listing")
454 clusterEPSList, err := clusterEPSClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
455 framework.ExpectNoError(err)
456 gomega.Expect(clusterEPSList.Items).To(gomega.HaveLen(3), "filtered list should have 3 items")
457
458 ginkgo.By("cluster-wide watching")
459 framework.Logf("starting watch")
460 _, err = clusterEPSClient.Watch(ctx, metav1.ListOptions{ResourceVersion: epsList.ResourceVersion, LabelSelector: "special-label=" + f.UniqueName})
461 framework.ExpectNoError(err)
462
463 ginkgo.By("patching")
464 patchedEPS, err := epsClient.Patch(ctx, createdEPS.Name, types.MergePatchType, []byte(`{"metadata":{"annotations":{"patched":"true"}}}`), metav1.PatchOptions{})
465 framework.ExpectNoError(err)
466 gomega.Expect(patchedEPS.Annotations).To(gomega.HaveKeyWithValue("patched", "true"), "patched object should have the applied annotation")
467
468 ginkgo.By("updating")
469 var epsToUpdate, updatedEPS *discoveryv1.EndpointSlice
470 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
471 epsToUpdate, err = epsClient.Get(ctx, createdEPS.Name, metav1.GetOptions{})
472 if err != nil {
473 return err
474 }
475 epsToUpdate.Annotations["updated"] = "true"
476 updatedEPS, err = epsClient.Update(ctx, epsToUpdate, metav1.UpdateOptions{})
477 return err
478 })
479 framework.ExpectNoError(err)
480 gomega.Expect(updatedEPS.Annotations).To(gomega.HaveKeyWithValue("updated", "true"), "updated object should have the applied annotation")
481
482 framework.Logf("waiting for watch events with expected annotations")
483 for sawAnnotations := false; !sawAnnotations; {
484 select {
485 case evt, ok := <-epsWatch.ResultChan():
486 if !ok {
487 framework.Fail("watch channel should not close")
488 }
489 gomega.Expect(evt.Type).To(gomega.Equal(watch.Modified))
490 watchedEPS, isEPS := evt.Object.(*discoveryv1.EndpointSlice)
491 if !isEPS {
492 framework.Failf("expected EndpointSlice, got %T", evt.Object)
493 }
494 if watchedEPS.Annotations["patched"] == "true" {
495 framework.Logf("saw patched and updated annotations")
496 sawAnnotations = true
497 epsWatch.Stop()
498 } else {
499 framework.Logf("missing expected annotations, waiting: %#v", watchedEPS.Annotations)
500 }
501 case <-time.After(wait.ForeverTestTimeout):
502 framework.Fail("timed out waiting for watch event")
503 }
504 }
505
506 ginkgo.By("deleting")
507
508 err = epsClient.Delete(ctx, createdEPS.Name, metav1.DeleteOptions{})
509 framework.ExpectNoError(err)
510 _, err = epsClient.Get(ctx, createdEPS.Name, metav1.GetOptions{})
511 if !apierrors.IsNotFound(err) {
512 framework.Failf("expected 404, got %v", err)
513 }
514 epsList, err = epsClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
515 framework.ExpectNoError(err)
516 gomega.Expect(epsList.Items).To(gomega.HaveLen(2), "filtered list should have 2 items")
517 for _, eps := range epsList.Items {
518 if eps.Namespace == createdEPS.Namespace && eps.Name == createdEPS.Name {
519 framework.Fail("listing after deleting createdEPS")
520 }
521 }
522
523 ginkgo.By("deleting a collection")
524 err = epsClient.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
525 framework.ExpectNoError(err)
526 epsList, err = epsClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName})
527 framework.ExpectNoError(err)
528 gomega.Expect(epsList.Items).To(gomega.BeEmpty(), "filtered list should have 0 items")
529 })
530
531 ginkgo.It("should support a Service with multiple ports specified in multiple EndpointSlices", func(ctx context.Context) {
532 ns := f.Namespace.Name
533 svc := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{
534 ObjectMeta: metav1.ObjectMeta{
535 Name: "example-custom-endpoints",
536 },
537 Spec: v1.ServiceSpec{
538 Ports: []v1.ServicePort{
539 {
540 Name: "port80",
541 Port: 80,
542 Protocol: v1.ProtocolTCP,
543 },
544 {
545 Name: "port81",
546 Port: 81,
547 Protocol: v1.ProtocolTCP,
548 },
549 },
550 },
551 })
552
553
554 port8090 := []v1.ContainerPort{
555 {
556 ContainerPort: 8090,
557 Protocol: v1.ProtocolTCP,
558 },
559 }
560 port9090 := []v1.ContainerPort{
561 {
562 ContainerPort: 9090,
563 Protocol: v1.ProtocolTCP,
564 },
565 }
566
567 serverPod := e2epod.NewAgnhostPodFromContainers(
568 "", "pod-handle-http-request", nil,
569 e2epod.NewAgnhostContainer("container-handle-8090-request", nil, port8090, "netexec", "--http-port", "8090", "--udp-port", "-1"),
570 e2epod.NewAgnhostContainer("container-handle-9090-request", nil, port9090, "netexec", "--http-port", "9090", "--udp-port", "-1"),
571 )
572
573 pod := e2epod.NewPodClient(f).CreateSync(ctx, serverPod)
574
575 if pod.Status.PodIP == "" {
576 framework.Failf("PodIP not assigned for pod %s", pod.Name)
577 }
578
579 addressType := discoveryv1.AddressTypeIPv4
580 if framework.TestContext.ClusterIsIPv6() {
581 addressType = discoveryv1.AddressTypeIPv6
582 }
583
584
585 tcpProtocol := v1.ProtocolTCP
586 readyCondTrue := true
587 epsTemplate := &discoveryv1.EndpointSlice{
588 ObjectMeta: metav1.ObjectMeta{GenerateName: "e2e-custom-slice",
589 Labels: map[string]string{
590 discoveryv1.LabelServiceName: svc.Name,
591 discoveryv1.LabelManagedBy: "e2e-test" + ns,
592 }},
593 AddressType: addressType,
594 Endpoints: []discoveryv1.Endpoint{
595 {
596 Addresses: []string{pod.Status.PodIP},
597 Conditions: discoveryv1.EndpointConditions{Ready: &readyCondTrue},
598 },
599 },
600 }
601
602 ginkgo.By("creating")
603 eps1 := epsTemplate.DeepCopy()
604 eps1.Ports = []discoveryv1.EndpointPort{{
605 Name: pointer.String("port80"),
606 Port: pointer.Int32(8090),
607 Protocol: &tcpProtocol,
608 }}
609
610 _, err := f.ClientSet.DiscoveryV1().EndpointSlices(ns).Create(ctx, eps1, metav1.CreateOptions{})
611 framework.ExpectNoError(err)
612 eps2 := epsTemplate.DeepCopy()
613 eps2.Ports = []discoveryv1.EndpointPort{{
614 Name: pointer.String("port81"),
615 Port: pointer.Int32(9090),
616 Protocol: &tcpProtocol,
617 }}
618
619 _, err = f.ClientSet.DiscoveryV1().EndpointSlices(ns).Create(ctx, eps2, metav1.CreateOptions{})
620 framework.ExpectNoError(err)
621
622
623 ginkgo.By("Creating a pause pods that will try to connect to the webserver")
624 pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
625 e2epod.NewPodClient(f).CreateSync(ctx, pausePod0)
626
627 dest1 := net.JoinHostPort(svc.Spec.ClusterIP, "80")
628 dest2 := net.JoinHostPort(svc.Spec.ClusterIP, "81")
629 execHostnameTest(*pausePod0, dest1, serverPod.Name)
630 execHostnameTest(*pausePod0, dest2, serverPod.Name)
631
632 })
633
634 ginkgo.It("should support a Service with multiple endpoint IPs specified in multiple EndpointSlices", func(ctx context.Context) {
635 ns := f.Namespace.Name
636 svc := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{
637 ObjectMeta: metav1.ObjectMeta{
638 Name: "example-custom-endpoints",
639 },
640 Spec: v1.ServiceSpec{
641 Ports: []v1.ServicePort{
642 {
643 Name: "port80",
644 Port: 80,
645 Protocol: v1.ProtocolTCP,
646 },
647 {
648 Name: "port81",
649 Port: 81,
650 Protocol: v1.ProtocolTCP,
651 },
652 },
653 },
654 })
655
656
657 port8090 := []v1.ContainerPort{
658 {
659 ContainerPort: 8090,
660 Protocol: v1.ProtocolTCP,
661 },
662 }
663
664 serverPod1 := e2epod.NewAgnhostPodFromContainers(
665 "", "pod1-handle-http-request", nil,
666 e2epod.NewAgnhostContainer("container-handle-8090-request", nil, port8090, "netexec", "--http-port", "8090", "--udp-port", "-1"),
667 )
668 pod1 := e2epod.NewPodClient(f).CreateSync(ctx, serverPod1)
669
670 if pod1.Status.PodIP == "" {
671 framework.Failf("PodIP not assigned for pod %s", pod1.Name)
672 }
673
674 serverPod2 := e2epod.NewAgnhostPodFromContainers(
675 "", "pod2-handle-http-request", nil,
676 e2epod.NewAgnhostContainer("container-handle-8090-request", nil, port8090, "netexec", "--http-port", "8090", "--udp-port", "-1"),
677 )
678 pod2 := e2epod.NewPodClient(f).CreateSync(ctx, serverPod2)
679
680 if pod2.Status.PodIP == "" {
681 framework.Failf("PodIP not assigned for pod %s", pod2.Name)
682 }
683
684 addressType := discoveryv1.AddressTypeIPv4
685 if framework.TestContext.ClusterIsIPv6() {
686 addressType = discoveryv1.AddressTypeIPv6
687 }
688
689
690 tcpProtocol := v1.ProtocolTCP
691 readyCondTrue := true
692 epsTemplate := &discoveryv1.EndpointSlice{
693 ObjectMeta: metav1.ObjectMeta{GenerateName: "e2e-custom-slice",
694 Labels: map[string]string{
695 discoveryv1.LabelServiceName: svc.Name,
696 discoveryv1.LabelManagedBy: "e2e-test" + ns,
697 }},
698 AddressType: addressType,
699 }
700
701 ginkgo.By("creating")
702 eps1 := epsTemplate.DeepCopy()
703 eps1.Endpoints = []discoveryv1.Endpoint{
704 {
705 Addresses: []string{pod1.Status.PodIP},
706 Conditions: discoveryv1.EndpointConditions{Ready: &readyCondTrue},
707 },
708 }
709 eps1.Ports = []discoveryv1.EndpointPort{{
710 Name: pointer.String("port80"),
711 Port: pointer.Int32(8090),
712 Protocol: &tcpProtocol,
713 }}
714
715 _, err := f.ClientSet.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), eps1, metav1.CreateOptions{})
716 framework.ExpectNoError(err)
717 eps2 := epsTemplate.DeepCopy()
718 eps2.Endpoints = []discoveryv1.Endpoint{
719 {
720 Addresses: []string{pod2.Status.PodIP},
721 Conditions: discoveryv1.EndpointConditions{Ready: &readyCondTrue},
722 },
723 }
724 eps2.Ports = []discoveryv1.EndpointPort{{
725 Name: pointer.String("port81"),
726 Port: pointer.Int32(8090),
727 Protocol: &tcpProtocol,
728 }}
729 _, err = f.ClientSet.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), eps2, metav1.CreateOptions{})
730 framework.ExpectNoError(err)
731
732
733 ginkgo.By("Creating a pause pods that will try to connect to the webserver")
734 pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
735 e2epod.NewPodClient(f).CreateSync(ctx, pausePod0)
736
737 dest1 := net.JoinHostPort(svc.Spec.ClusterIP, "80")
738 dest2 := net.JoinHostPort(svc.Spec.ClusterIP, "81")
739 execHostnameTest(*pausePod0, dest1, serverPod1.Name)
740 execHostnameTest(*pausePod0, dest2, serverPod2.Name)
741
742 })
743
744 })
745
746
747
748
749
750
751
752 func expectEndpointsAndSlices(ctx context.Context, cs clientset.Interface, ns string, svc *v1.Service, pods []*v1.Pod, numSubsets, numSlices int, namedPort bool) {
753 endpointSlices := []discoveryv1.EndpointSlice{}
754 if err := wait.PollUntilContextTimeout(ctx, 2*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) {
755 endpointSlicesFound, hasMatchingSlices := hasMatchingEndpointSlices(ctx, cs, ns, svc.Name, len(pods), numSlices)
756 if !hasMatchingSlices {
757 return false, nil
758 }
759 endpointSlices = endpointSlicesFound
760 return true, nil
761 }); err != nil {
762 framework.Failf("Timed out waiting for EndpointSlices to match expectations: %v", err)
763 }
764
765 endpoints := &v1.Endpoints{}
766 if err := wait.PollUntilContextTimeout(ctx, 2*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) {
767 endpointsFound, hasMatchingEndpoints := hasMatchingEndpoints(ctx, cs, ns, svc.Name, len(pods), numSubsets)
768 if !hasMatchingEndpoints {
769 framework.Logf("Matching Endpoints not found")
770 return false, nil
771 }
772 endpoints = endpointsFound
773 return true, nil
774 }); err != nil {
775 framework.Failf("Timed out waiting for Endpoints to match expectations: %v", err)
776 }
777
778 podsByIP := map[string]*v1.Pod{}
779 for _, pod := range pods {
780 podsByIP[pod.Status.PodIP] = pod
781 if len(pod.Spec.Containers) != 1 {
782 framework.Failf("Expected pod to have 1 container, got %d", len(pod.Spec.Containers))
783 }
784 }
785
786 if endpoints.Name != svc.Name {
787 framework.Failf("Expected Endpoints name to be %s, got %s", svc.Name, endpoints.Name)
788 }
789
790 totalEndpointAddresses := 0
791 for _, subset := range endpoints.Subsets {
792 addresses := append(subset.Addresses, subset.NotReadyAddresses...)
793 totalEndpointAddresses += len(addresses)
794
795 if len(subset.Ports) != len(svc.Spec.Ports) {
796 framework.Failf("Expected subset to have %d ports, got %d", len(svc.Spec.Ports), len(subset.Ports))
797 }
798
799
800
801 if !namedPort {
802 for i, subsetPort := range subset.Ports {
803 svcPort := svc.Spec.Ports[i]
804 if subsetPort.Name != svcPort.Name {
805 framework.Failf("Expected port name to be %s, got %s", svcPort.Name, subsetPort.Name)
806 }
807 if subsetPort.Protocol != svcPort.Protocol {
808 framework.Failf("Expected protocol to be %s, got %s", svcPort.Protocol, subsetPort.Protocol)
809 }
810 if subsetPort.Port != svcPort.TargetPort.IntVal {
811 framework.Failf("Expected port to be %d, got %d", svcPort.TargetPort.IntVal, subsetPort.Port)
812 }
813 }
814 }
815
816 for _, address := range addresses {
817 pod, ok := podsByIP[address.IP]
818 if !ok {
819 framework.Failf("Unexpected address with IP: %s", address.IP)
820 }
821
822 ensurePodTargetRef(pod, address.TargetRef)
823
824
825
826 if namedPort {
827 container := pod.Spec.Containers[0]
828 for _, port := range container.Ports {
829 if port.Name == svc.Spec.Ports[0].TargetPort.String() {
830 subsetPort := subset.Ports[0]
831 if subsetPort.Port != port.ContainerPort {
832 framework.Failf("Expected subset port to be %d, got %d", port.ContainerPort, subsetPort.Port)
833 }
834 if subsetPort.Name != svc.Spec.Ports[0].Name {
835 framework.Failf("Expected subset port name to be %s, got %s", svc.Spec.Ports[0].Name, subsetPort.Name)
836 }
837 }
838 }
839 }
840 }
841 }
842
843 if len(pods) != totalEndpointAddresses {
844 framework.Failf("Expected %d addresses, got %d", len(pods), totalEndpointAddresses)
845 }
846
847 if len(pods) == 0 && len(endpointSlices) != 1 {
848 framework.Failf("Expected 1 EndpointSlice, got %d", len(endpointSlices))
849 }
850
851
852
853 esAddresses := sets.NewString()
854 for _, endpointSlice := range endpointSlices {
855 for _, endpoint := range endpointSlice.Endpoints {
856 esAddresses.Insert(endpoint.Addresses[0])
857 }
858 if len(pods) == 0 && len(endpointSlice.Ports) != 0 {
859 framework.Failf("Expected EndpointSlice to have 0 ports, got %d", len(endpointSlice.Ports))
860 }
861 if len(pods) > 0 && len(endpointSlice.Ports) != len(svc.Spec.Ports) {
862 framework.Failf("Expected EndpointSlice to have %d ports, got %d", len(svc.Spec.Ports), len(endpointSlice.Ports))
863 }
864
865
866
867 if !namedPort {
868 for i, esPort := range endpointSlice.Ports {
869 svcPort := svc.Spec.Ports[i]
870 if *esPort.Name != svcPort.Name {
871 framework.Failf("Expected port name to be %s, got %s", svcPort.Name, *esPort.Name)
872 }
873 if *esPort.Protocol != svcPort.Protocol {
874 framework.Failf("Expected protocol to be %s, got %s", svcPort.Protocol, *esPort.Protocol)
875 }
876 if *esPort.Port != svcPort.TargetPort.IntVal {
877 framework.Failf("Expected port to be %d, got %d", svcPort.TargetPort.IntVal, *esPort.Port)
878 }
879 }
880 }
881
882 for _, endpoint := range endpointSlice.Endpoints {
883 if len(endpoint.Addresses) == 0 {
884 framework.Failf("Expected EndpointSlice endpoint to have at least 1 address")
885 }
886 pod, ok := podsByIP[endpoint.Addresses[0]]
887 if !ok {
888 framework.Failf("Unexpected address with IP: %s", endpoint.Addresses[0])
889 }
890
891 ensurePodTargetRef(pod, endpoint.TargetRef)
892
893
894
895 if namedPort {
896 container := pod.Spec.Containers[0]
897 for _, port := range container.Ports {
898 if port.Name == svc.Spec.Ports[0].TargetPort.String() {
899 esPort := endpointSlice.Ports[0]
900 if *esPort.Port != port.ContainerPort {
901 framework.Failf("Expected EndpointSlice port to be %d, got %d", port.ContainerPort, *esPort.Port)
902 }
903 if *esPort.Name != svc.Spec.Ports[0].Name {
904 framework.Failf("Expected EndpointSlice port name to be %s, got %s", svc.Spec.Ports[0].Name, *esPort.Name)
905 }
906 }
907 }
908 }
909 }
910 }
911
912 if len(pods) != esAddresses.Len() {
913 framework.Failf("Expected %d addresses, got %d", len(pods), esAddresses.Len())
914 }
915 }
916
917
918 func deleteEndpointSlices(ctx context.Context, cs clientset.Interface, ns string, svc *v1.Service) {
919 listOptions := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.Name)}
920 esList, err := cs.DiscoveryV1().EndpointSlices(ns).List(ctx, listOptions)
921 framework.ExpectNoError(err, "Error fetching EndpointSlices for %s/%s Service", ns, svc.Name)
922
923 for _, endpointSlice := range esList.Items {
924 err := cs.DiscoveryV1().EndpointSlices(ns).Delete(ctx, endpointSlice.Name, metav1.DeleteOptions{})
925 framework.ExpectNoError(err, "Error deleting %s/%s EndpointSlice", ns, endpointSlice.Name)
926 }
927 }
928
929
930
931
932 func hasMatchingEndpointSlices(ctx context.Context, cs clientset.Interface, ns, svcName string, numEndpoints, numSlices int) ([]discoveryv1.EndpointSlice, bool) {
933 listOptions := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svcName)}
934 esList, err := cs.DiscoveryV1().EndpointSlices(ns).List(ctx, listOptions)
935 framework.ExpectNoError(err, "Error fetching EndpointSlice for Service %s/%s", ns, svcName)
936
937 if len(esList.Items) == 0 {
938 framework.Logf("EndpointSlice for Service %s/%s not found", ns, svcName)
939 return []discoveryv1.EndpointSlice{}, false
940 }
941
942
943
944
945 if len(esList.Items) < numSlices {
946 framework.Logf("Expected at least %d EndpointSlices for Service %s/%s, got %d", numSlices, ns, svcName, len(esList.Items))
947 for i, epSlice := range esList.Items {
948 epsData, err := json.Marshal(epSlice)
949 if err != nil {
950 framework.Logf("Error marshaling JSON for EndpointSlice: %v", err)
951 } else {
952 framework.Logf("%d - %v", i, string(epsData))
953 }
954 }
955 return esList.Items, false
956 }
957
958 actualNumEndpoints := 0
959 for _, endpointSlice := range esList.Items {
960 actualNumEndpoints += len(endpointSlice.Endpoints)
961 }
962
963
964
965
966 if actualNumEndpoints < numEndpoints {
967 framework.Logf("EndpointSlices for %s/%s Service have %d/%d endpoints", ns, svcName, actualNumEndpoints, numEndpoints)
968 return esList.Items, false
969 }
970
971 return esList.Items, true
972 }
973
974
975
976 func hasMatchingEndpoints(ctx context.Context, cs clientset.Interface, ns, svcName string, numIPs, numSubsets int) (*v1.Endpoints, bool) {
977 endpoints, err := cs.CoreV1().Endpoints(ns).Get(ctx, svcName, metav1.GetOptions{})
978 if err != nil {
979 if apierrors.IsNotFound(err) {
980 framework.Logf("Endpoints for %s/%s Service not found", ns, svcName)
981 return nil, false
982 }
983 framework.ExpectNoError(err, "Error fetching Endpoints for %s/%s Service", ns, svcName)
984 }
985 if len(endpoints.Subsets) != numSubsets {
986 framework.Logf("Endpoints for %s/%s Service with %d/%d Subsets", ns, svcName, len(endpoints.Subsets), numSubsets)
987 return nil, false
988 }
989
990 actualNumIPs := 0
991 for _, endpointSubset := range endpoints.Subsets {
992 actualNumIPs += len(endpointSubset.Addresses) + len(endpointSubset.NotReadyAddresses)
993 }
994 if actualNumIPs != numIPs {
995 framework.Logf("Endpoints for %s/%s Service with %d/%d IPs", ns, svcName, actualNumIPs, numIPs)
996 return nil, false
997 }
998
999 return endpoints, true
1000 }
1001
1002
1003 func ensurePodTargetRef(pod *v1.Pod, targetRef *v1.ObjectReference) {
1004 if targetRef == nil {
1005 framework.Failf("Expected TargetRef to not be nil")
1006 }
1007 if targetRef.Kind != "Pod" {
1008 framework.Failf("Expected TargetRef.Kind to be Pod, got %s", targetRef.Kind)
1009 }
1010 if targetRef.Namespace != pod.Namespace {
1011 framework.Failf("Expected TargetRef.Namespace to be %s, got %s", pod.Namespace, targetRef.Namespace)
1012 }
1013 if targetRef.Name != pod.Name {
1014 framework.Failf("Expected TargetRef.Name to be %s, got %s", pod.Name, targetRef.Name)
1015 }
1016 if targetRef.UID != pod.UID {
1017 framework.Failf("Expected TargetRef.UID to be %s, got %s", pod.UID, targetRef.UID)
1018 }
1019 }
1020
1021
1022 func createServiceReportErr(ctx context.Context, cs clientset.Interface, ns string, service *v1.Service) *v1.Service {
1023 svc, err := cs.CoreV1().Services(ns).Create(ctx, service, metav1.CreateOptions{})
1024 framework.ExpectNoError(err, "error deleting Service")
1025 return svc
1026 }
1027
1028
1029
1030 func endpointSlicesEqual(endpoints *v1.Endpoints, endpointSliceList *discoveryv1.EndpointSliceList) bool {
1031
1032 epAddresses := sets.NewString()
1033 epPorts := sets.NewInt32()
1034 for _, subset := range endpoints.Subsets {
1035 for _, addr := range subset.Addresses {
1036 epAddresses.Insert(addr.IP)
1037 }
1038 for _, addr := range subset.NotReadyAddresses {
1039 epAddresses.Insert(addr.IP)
1040 }
1041 for _, port := range subset.Ports {
1042 epPorts.Insert(port.Port)
1043 }
1044 }
1045 framework.Logf("Endpoints addresses: %v , ports: %v", epAddresses.List(), epPorts.List())
1046
1047
1048
1049
1050 var addrType discoveryv1.AddressType
1051 if framework.TestContext.ClusterIsIPv6() {
1052 addrType = discoveryv1.AddressTypeIPv6
1053 } else {
1054 addrType = discoveryv1.AddressTypeIPv4
1055 }
1056
1057
1058 sliceAddresses := sets.NewString()
1059 slicePorts := sets.NewInt32()
1060 for _, slice := range endpointSliceList.Items {
1061 if slice.AddressType != addrType {
1062 framework.Logf("Skipping slice %s: wanted %s family, got %s", slice.Name, addrType, slice.AddressType)
1063 continue
1064 }
1065 for _, s := range slice.Endpoints {
1066 sliceAddresses.Insert(s.Addresses...)
1067 }
1068 for _, ports := range slice.Ports {
1069 if ports.Port != nil {
1070 slicePorts.Insert(*ports.Port)
1071 }
1072 }
1073 }
1074
1075 framework.Logf("EndpointSlices addresses: %v , ports: %v", sliceAddresses.List(), slicePorts.List())
1076 if sliceAddresses.Equal(epAddresses) && slicePorts.Equal(epPorts) {
1077 return true
1078 }
1079 return false
1080 }
1081
View as plain text