/* Copyright 2020 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package endpoints import ( "errors" "fmt" "testing" "time" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/utils/ktesting" ) func TestEndpointUpdates(t *testing.T) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) defer server.TearDownFn() client, err := clientset.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("Error creating clientset: %v", err) } informers := informers.NewSharedInformerFactory(client, 0) tCtx := ktesting.Init(t) epController := endpoint.NewEndpointController( tCtx, informers.Core().V1().Pods(), informers.Core().V1().Services(), informers.Core().V1().Endpoints(), client, 0) // Start informer and controllers informers.Start(tCtx.Done()) go epController.Run(tCtx, 1) // Create namespace ns := framework.CreateNamespaceOrDie(client, "test-endpoints-updates", t) defer framework.DeleteNamespaceOrDie(client, ns, t) // Create a pod with labels pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pod", Namespace: ns.Name, Labels: labelMap(), }, Spec: v1.PodSpec{ NodeName: "fakenode", Containers: []v1.Container{ { Name: "fake-name", Image: "fakeimage", }, }, }, } createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create pod %s: %v", pod.Name, err) } // Set pod IPs createdPod.Status = v1.PodStatus{ Phase: v1.PodRunning, PodIPs: []v1.PodIP{{IP: "1.1.1.1"}, {IP: "2001:db8::"}}, } _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{}) if err != nil { t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err) } // Create a service associated to the pod svc := newService(ns.Name, "foo1") svc1, err := client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create service %s: %v", svc.Name, err) } // Obtain ResourceVersion of the new endpoint created var resVersion string if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) { endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) if err != nil { t.Logf("error fetching endpoints: %v", err) return false, nil } resVersion = endpoints.ObjectMeta.ResourceVersion return true, nil }); err != nil { t.Fatalf("endpoints not found: %v", err) } // Force recomputation on the endpoint controller svc1.SetAnnotations(map[string]string{"foo": "bar"}) _, err = client.CoreV1().Services(ns.Name).Update(tCtx, svc1, metav1.UpdateOptions{}) if err != nil { t.Fatalf("Failed to update service %s: %v", svc1.Name, err) } // Create a new service and wait until it has been processed, // this way we can be sure that the endpoint for the original service // was recomputed before asserting, since we only have 1 worker // in the endpoint controller svc2 := newService(ns.Name, "foo2") _, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc2, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create service %s: %v", svc.Name, err) } if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) { _, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc2.Name, metav1.GetOptions{}) if err != nil { t.Logf("error fetching endpoints: %v", err) return false, nil } return true, nil }); err != nil { t.Fatalf("endpoints not found: %v", err) } // the endpoint controller should not update the endpoint created for the original // service since nothing has changed, the resource version has to be the same endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("error fetching endpoints: %v", err) } if resVersion != endpoints.ObjectMeta.ResourceVersion { t.Fatalf("endpoints resource version does not match, expected %s received %s", resVersion, endpoints.ObjectMeta.ResourceVersion) } } // TestExternalNameToClusterIPTransition tests that Service of type ExternalName // does not get endpoints, and after transition to ClusterIP, service gets endpoint, // without headless label func TestExternalNameToClusterIPTransition(t *testing.T) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) defer server.TearDownFn() client, err := clientset.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("Error creating clientset: %v", err) } informers := informers.NewSharedInformerFactory(client, 0) tCtx := ktesting.Init(t) epController := endpoint.NewEndpointController( tCtx, informers.Core().V1().Pods(), informers.Core().V1().Services(), informers.Core().V1().Endpoints(), client, 0) // Start informer and controllers informers.Start(tCtx.Done()) go epController.Run(tCtx, 1) // Create namespace ns := framework.CreateNamespaceOrDie(client, "test-endpoints-updates", t) defer framework.DeleteNamespaceOrDie(client, ns, t) // Create a pod with labels pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pod", Namespace: ns.Name, Labels: labelMap(), }, Spec: v1.PodSpec{ NodeName: "fakenode", Containers: []v1.Container{ { Name: "fake-name", Image: "fakeimage", }, }, }, } createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create pod %s: %v", pod.Name, err) } // Set pod IPs createdPod.Status = v1.PodStatus{ Phase: v1.PodRunning, PodIPs: []v1.PodIP{{IP: "1.1.1.1"}, {IP: "2001:db8::"}}, } _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{}) if err != nil { t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err) } // Create an ExternalName service associated to the pod svc := newExternalNameService(ns.Name, "foo1") svc1, err := client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create service %s: %v", svc.Name, err) } err = wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) { endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) if err == nil { t.Errorf("expected no endpoints for externalName service, got: %v", endpoints) return true, nil } return false, nil }) if err == nil { t.Errorf("expected error waiting for endpoints") } // update service to ClusterIP type and verify endpoint was created svc1.Spec.Type = v1.ServiceTypeClusterIP _, err = client.CoreV1().Services(ns.Name).Update(tCtx, svc1, metav1.UpdateOptions{}) if err != nil { t.Fatalf("Failed to update service %s: %v", svc1.Name, err) } if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) { ep, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc1.Name, metav1.GetOptions{}) if err != nil { t.Logf("no endpoints found, error: %v", err) return false, nil } t.Logf("endpoint %s was successfully created", svc1.Name) if _, ok := ep.Labels[v1.IsHeadlessService]; ok { t.Errorf("ClusterIP endpoint should not have headless label, got: %v", ep) } return true, nil }); err != nil { t.Fatalf("endpoints not found: %v", err) } } // TestEndpointWithTerminatingPod tests that terminating pods are NOT included in Endpoints. // This capability is only available in the newer EndpointSlice API and there are no plans to // include it for Endpoints. This test can be removed in the future if we decide to include // terminating endpoints in Endpoints, but in the mean time this test ensures we do not change // this behavior accidentally. func TestEndpointWithTerminatingPod(t *testing.T) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) defer server.TearDownFn() client, err := clientset.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("Error creating clientset: %v", err) } informers := informers.NewSharedInformerFactory(client, 0) tCtx := ktesting.Init(t) epController := endpoint.NewEndpointController( tCtx, informers.Core().V1().Pods(), informers.Core().V1().Services(), informers.Core().V1().Endpoints(), client, 0) // Start informer and controllers informers.Start(tCtx.Done()) go epController.Run(tCtx, 1) // Create namespace ns := framework.CreateNamespaceOrDie(client, "test-endpoints-terminating", t) defer framework.DeleteNamespaceOrDie(client, ns, t) // Create a pod with labels pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pod", Labels: labelMap(), }, Spec: v1.PodSpec{ NodeName: "fake-node", Containers: []v1.Container{ { Name: "fakename", Image: "fakeimage", Ports: []v1.ContainerPort{ { Name: "port-443", ContainerPort: 443, }, }, }, }, }, Status: v1.PodStatus{ Phase: v1.PodRunning, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, PodIP: "10.0.0.1", PodIPs: []v1.PodIP{ { IP: "10.0.0.1", }, }, }, } createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create pod %s: %v", pod.Name, err) } createdPod.Status = pod.Status _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{}) if err != nil { t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err) } // Create a service associated to the pod svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "test-service", Namespace: ns.Name, Labels: map[string]string{ "foo": "bar", }, }, Spec: v1.ServiceSpec{ Selector: map[string]string{ "foo": "bar", }, Ports: []v1.ServicePort{ {Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)}, }, }, } _, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create service %s: %v", svc.Name, err) } // poll until associated Endpoints to the previously created Service exists if err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) { endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) if err != nil { return false, nil } numEndpoints := 0 for _, subset := range endpoints.Subsets { numEndpoints += len(subset.Addresses) } if numEndpoints == 0 { return false, nil } return true, nil }); err != nil { t.Fatalf("endpoints not found: %v", err) } err = client.CoreV1().Pods(ns.Name).Delete(tCtx, pod.Name, metav1.DeleteOptions{}) if err != nil { t.Fatalf("error deleting test pod: %v", err) } // poll until endpoint for deleted Pod is no longer in Endpoints. if err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) { // Ensure that the recently deleted Pod exists but with a deletion timestamp. If the Pod does not exist, // we should fail the test since it is no longer validating against a terminating pod. pod, err := client.CoreV1().Pods(ns.Name).Get(tCtx, pod.Name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { return false, fmt.Errorf("expected Pod %q to exist with deletion timestamp but was not found: %v", pod.Name, err) } if err != nil { return false, nil } if pod.DeletionTimestamp == nil { return false, errors.New("pod did not have deletion timestamp set") } endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) if err != nil { return false, nil } numEndpoints := 0 for _, subset := range endpoints.Subsets { numEndpoints += len(subset.Addresses) } if numEndpoints > 0 { return false, nil } return true, nil }); err != nil { t.Fatalf("error checking for no endpoints with terminating pods: %v", err) } } func labelMap() map[string]string { return map[string]string{"foo": "bar"} } // newService returns a service with selector and exposing ports func newService(namespace, name string) *v1.Service { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, Labels: labelMap(), }, Spec: v1.ServiceSpec{ Selector: labelMap(), Ports: []v1.ServicePort{ {Name: "port-1338", Port: 1338, Protocol: "TCP", TargetPort: intstr.FromInt32(1338)}, {Name: "port-1337", Port: 1337, Protocol: "TCP", TargetPort: intstr.FromInt32(1337)}, }, }, } } // newExternalNameService returns an ExternalName service with selector and exposing ports func newExternalNameService(namespace, name string) *v1.Service { svc := newService(namespace, name) svc.Spec.Type = v1.ServiceTypeExternalName svc.Spec.ExternalName = "google.com" return svc }