/* Copyright 2014 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package endpoint import ( "context" "fmt" "net/http" "net/http/httptest" "reflect" "strconv" "testing" "time" "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" clientscheme "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" utiltesting "k8s.io/client-go/util/testing" endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints" api "k8s.io/kubernetes/pkg/apis/core" controllerpkg "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/test/utils/ktesting" utilnet "k8s.io/utils/net" "k8s.io/utils/pointer" ) var alwaysReady = func() bool { return true } var neverReady = func() bool { return false } var emptyNodeName string var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC) var triggerTimeString = triggerTime.Format(time.RFC3339Nano) var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano) var ipv4only = []v1.IPFamily{v1.IPv4Protocol} var ipv6only = []v1.IPFamily{v1.IPv6Protocol} var ipv4ipv6 = []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol} var ipv6ipv4 = []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol} func testPod(namespace string, id int, nPorts int, isReady bool, ipFamilies []v1.IPFamily) *v1.Pod { p := &v1.Pod{ TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, Name: fmt.Sprintf("pod%d", id), Labels: map[string]string{"foo": "bar"}, ResourceVersion: fmt.Sprint(id), }, Spec: v1.PodSpec{ Containers: []v1.Container{{Ports: []v1.ContainerPort{}}}, }, Status: v1.PodStatus{ Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, }, } if !isReady { p.Status.Conditions[0].Status = v1.ConditionFalse } for j := 0; j < nPorts; j++ { p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)}) } for _, family := range ipFamilies { var ip string if family == v1.IPv4Protocol { ip = fmt.Sprintf("1.2.3.%d", 4+id) } else { ip = fmt.Sprintf("2000::%d", 4+id) } p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: ip}) } p.Status.PodIP = p.Status.PodIPs[0].IP return p } func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, ipFamilies []v1.IPFamily) { for i := 0; i < nPods+nNotReady; i++ { isReady := i < nPods pod := testPod(namespace, i, nPorts, isReady, ipFamilies) store.Add(pod) } } func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) { for i := 0; i < nPods; i++ { p := &v1.Pod{ TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, Name: fmt.Sprintf("pod%d", i), Labels: map[string]string{"foo": "bar"}, }, Spec: v1.PodSpec{ RestartPolicy: restartPolicy, Containers: []v1.Container{{Ports: []v1.ContainerPort{}}}, }, Status: v1.PodStatus{ PodIP: fmt.Sprintf("1.2.3.%d", 4+i), Phase: podPhase, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionFalse, }, }, }, } for j := 0; j < nPorts; j++ { p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)}) } store.Add(p) } } func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) { fakeEndpointsHandler := utiltesting.FakeHandler{ StatusCode: http.StatusOK, ResponseBody: runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{}), } mux := http.NewServeMux() if namespace == "" { t.Fatal("namespace cannot be empty") } mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints", &fakeEndpointsHandler) mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints/", &fakeEndpointsHandler) mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { t.Errorf("unexpected request: %v", req.RequestURI) http.Error(res, "", http.StatusNotFound) }) return httptest.NewServer(mux), &fakeEndpointsHandler } // makeBlockingEndpointDeleteTestServer will signal the blockNextAction channel on endpoint "POST" & "DELETE" requests. All // block endpoint "DELETE" requestsi will wait on a blockDelete signal to delete endpoint. If controller is nil, a error will // be sent in the response. func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server { handlerFunc := func(res http.ResponseWriter, req *http.Request) { if controller == nil { res.WriteHeader(http.StatusInternalServerError) res.Write([]byte("controller has not been set yet")) return } if req.Method == "POST" { controller.endpointsStore.Add(endpoint) blockNextAction <- struct{}{} } if req.Method == "DELETE" { go func() { // Delay the deletion of endoints to make endpoint cache out of sync <-blockDelete controller.endpointsStore.Delete(endpoint) controller.onEndpointsDelete(endpoint) }() blockNextAction <- struct{}{} } res.WriteHeader(http.StatusOK) res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{}))) } mux := http.NewServeMux() mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints", handlerFunc) mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints/", handlerFunc) mux.HandleFunc("/api/v1/namespaces/"+namespace+"/events", func(res http.ResponseWriter, req *http.Request) {}) mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { t.Errorf("unexpected request: %v", req.RequestURI) http.Error(res, "", http.StatusNotFound) }) return httptest.NewServer(mux) } type endpointController struct { *Controller podStore cache.Store serviceStore cache.Store endpointsStore cache.Store } func newController(ctx context.Context, url string, batchPeriod time.Duration) *endpointController { client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc()) endpoints := NewEndpointController(ctx, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), informerFactory.Core().V1().Endpoints(), client, batchPeriod) endpoints.podsSynced = alwaysReady endpoints.servicesSynced = alwaysReady endpoints.endpointsSynced = alwaysReady return &endpointController{ endpoints, informerFactory.Core().V1().Pods().Informer().GetStore(), informerFactory.Core().V1().Services().Informer().GetStore(), informerFactory.Core().V1().Endpoints().Informer().GetStore(), } } func newFakeController(ctx context.Context, batchPeriod time.Duration) (*fake.Clientset, *endpointController) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc()) eController := NewEndpointController( ctx, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), informerFactory.Core().V1().Endpoints(), client, batchPeriod) eController.podsSynced = alwaysReady eController.servicesSynced = alwaysReady eController.endpointsSynced = alwaysReady return client, &endpointController{ eController, informerFactory.Core().V1().Pods().Informer().GetStore(), informerFactory.Core().V1().Services().Informer().GetStore(), informerFactory.Core().V1().Endpoints().Informer().GetStore(), } } func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, Ports: []v1.EndpointPort{{Port: 1000}}, }}, }) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}}, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsExistingNilSubsets(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: nil, }) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsExistingEmptySubsets(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{}, }) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsWithPodResourceVersionUpdateOnly(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() pod0 := testPod(ns, 0, 1, true, ipv4only) pod1 := testPod(ns, 1, 1, false, ipv4only) tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ { IP: pod0.Status.PodIPs[0].IP, NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod0.Name, Namespace: ns, ResourceVersion: "1"}, }, }, NotReadyAddresses: []v1.EndpointAddress{ { IP: pod1.Status.PodIPs[0].IP, NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod1.Name, Namespace: ns, ResourceVersion: "2"}, }, }, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}}, }, }) pod0.ResourceVersion = "3" pod1.ResourceVersion = "4" endpoints.podStore.Add(pod0) endpoints.podStore.Add(pod1) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsNewNoSubsets(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 1) } func TestCheckLeftoverEndpoints(t *testing.T) { ns := metav1.NamespaceDefault testServer, _ := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, Ports: []v1.EndpointPort{{Port: 1000}}, }}, }) endpoints.checkLeftoverEndpoints() if e, a := 1, endpoints.queue.Len(); e != a { t.Fatalf("Expected %v, got %v", e, a) } got, _ := endpoints.queue.Get() if e, a := ns+"/foo", got; e != a { t.Errorf("Expected %v, got %v", e, a) } } func TestSyncEndpointsProtocolTCP(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestSyncEndpointsHeadlessServiceLabel(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{}, }) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncServiceExternalNameType(t *testing.T) { serviceName := "testing-1" namespace := metav1.NamespaceDefault testCases := []struct { desc string service *v1.Service }{ { desc: "External name with selector and ports should not receive endpoints", service: &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80}}, Type: v1.ServiceTypeExternalName, }, }, }, { desc: "External name with ports should not receive endpoints", service: &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace}, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{{Port: 80}}, Type: v1.ServiceTypeExternalName, }, }, }, { desc: "External name with selector should not receive endpoints", service: &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Type: v1.ServiceTypeExternalName, }, }, }, { desc: "External name without selector and ports should not receive endpoints", service: &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace}, Spec: v1.ServiceSpec{ Type: v1.ServiceTypeExternalName, }, }, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { testServer, endpointsHandler := makeTestServer(t, namespace) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) err := endpoints.serviceStore.Add(tc.service) if err != nil { t.Fatalf("Error adding service to service store: %v", err) } err = endpoints.syncService(tCtx, namespace+"/"+serviceName) if err != nil { t.Fatalf("Error syncing service: %v", err) } endpointsHandler.ValidateRequestCount(t, 0) }) } } func TestSyncEndpointsProtocolUDP(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}}, }}, }) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "UDP"}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "UDP"}}, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestSyncEndpointsProtocolSCTP(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}}, }}, }) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "SCTP"}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "SCTP"}}, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{}, }) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{}, }) addPods(endpoints.podStore, ns, 0, 1, 1, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{{ NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{}, }) addPods(endpoints.podStore, ns, 1, 1, 1, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestSyncEndpointsItemsPreexisting(t *testing.T) { ns := "bar" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, Ports: []v1.EndpointPort{{Port: 1000}}, }}, }) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "1", Name: "foo", Namespace: ns, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsItems(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only) addPods(endpoints.podStore, "blah", 5, 2, 0, ipv4only) // make sure these aren't found! endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{ {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}, {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt32(8088)}, }, }, }) err := endpoints.syncService(tCtx, "other/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } expectedSubsets := []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}, {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}}, }, Ports: []v1.EndpointPort{ {Name: "port0", Port: 8080, Protocol: "TCP"}, {Name: "port1", Port: 8088, Protocol: "TCP"}, }, }} data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", Name: "foo", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data) } func TestSyncEndpointsItemsWithLabels(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only) serviceLabels := map[string]string{"foo": "bar"} endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, Labels: serviceLabels, }, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{ {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}, {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt32(8088)}, }, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } expectedSubsets := []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}, {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}}, }, Ports: []v1.EndpointPort{ {Name: "port0", Port: 8080, Protocol: "TCP"}, {Name: "port1", Port: 8088, Protocol: "TCP"}, }, }} serviceLabels[v1.IsHeadlessService] = "" data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", Name: "foo", Labels: serviceLabels, }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data) } func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { ns := "bar" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ "foo": "bar", }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, Ports: []v1.EndpointPort{{Port: 1000}}, }}, }) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) serviceLabels := map[string]string{"baz": "blah"} endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, Labels: serviceLabels, }, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } serviceLabels[v1.IsHeadlessService] = "" data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: serviceLabels, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestWaitsForAllInformersToBeSynced2(t *testing.T) { var tests = []struct { podsSynced func() bool servicesSynced func() bool endpointsSynced func() bool shouldUpdateEndpoints bool }{ {neverReady, alwaysReady, alwaysReady, false}, {alwaysReady, neverReady, alwaysReady, false}, {alwaysReady, alwaysReady, neverReady, false}, {alwaysReady, alwaysReady, alwaysReady, true}, } for _, test := range tests { func() { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}}, }, } endpoints.serviceStore.Add(service) endpoints.onServiceUpdate(service) endpoints.podsSynced = test.podsSynced endpoints.servicesSynced = test.servicesSynced endpoints.endpointsSynced = test.endpointsSynced endpoints.workerLoopPeriod = 10 * time.Millisecond go endpoints.Run(tCtx, 1) // cache.WaitForNamedCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period. // To ensure we get all updates, including unexpected ones, we need to wait at least as long as // a single cache sync period and worker period, with some fudge room. time.Sleep(150 * time.Millisecond) if test.shouldUpdateEndpoints { // Ensure the work queue has been processed by looping for up to a second to prevent flakes. wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) { return endpoints.queue.Len() == 0, nil }) endpointsHandler.ValidateRequestCount(t, 1) } else { endpointsHandler.ValidateRequestCount(t, 0) } }() } } func TestSyncEndpointsHeadlessService(t *testing.T) { ns := "headless" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, Labels: map[string]string{"a": "b"}}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, ClusterIP: api.ClusterIPNone, Ports: []v1.ServicePort{}, }, } originalService := service.DeepCopy() endpoints.serviceStore.Add(service) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ "a": "b", v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{}, }}, }) if !reflect.DeepEqual(originalService, service) { t.Fatalf("syncing endpoints changed service: %s", cmp.Diff(service, originalService)) } endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ "foo": "bar", }, }, Subsets: []v1.EndpointSubset{}, }) addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ "foo": "bar", }, }, Subsets: []v1.EndpointSubset{}, }) addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ "foo": "bar", }, }, Subsets: []v1.EndpointSubset{}, }) addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, ClusterIP: "None", Ports: nil, }, }) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: nil, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data) } func TestPodToEndpointAddressForService(t *testing.T) { ipv4 := v1.IPv4Protocol ipv6 := v1.IPv6Protocol testCases := []struct { name string ipFamilies []v1.IPFamily service v1.Service expectedEndpointFamily v1.IPFamily expectError bool }{ { name: "v4 service, in a single stack cluster", ipFamilies: ipv4only, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "10.0.0.1", }, }, expectedEndpointFamily: ipv4, }, { name: "v4 service, in a dual stack cluster", ipFamilies: ipv4ipv6, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "10.0.0.1", }, }, expectedEndpointFamily: ipv4, }, { name: "v4 service, in a dual stack ipv6-primary cluster", ipFamilies: ipv6ipv4, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "10.0.0.1", }, }, expectedEndpointFamily: ipv4, }, { name: "v4 headless service, in a single stack cluster", ipFamilies: ipv4only, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: v1.ClusterIPNone, }, }, expectedEndpointFamily: ipv4, }, { name: "v4 headless service, in a dual stack cluster", ipFamilies: ipv4ipv6, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: v1.ClusterIPNone, IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, }, }, expectedEndpointFamily: ipv4, }, { name: "v4 legacy headless service, in a dual stack cluster", ipFamilies: ipv4ipv6, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: v1.ClusterIPNone, }, }, expectedEndpointFamily: ipv4, }, { name: "v4 legacy headless service, in a dual stack ipv6-primary cluster", ipFamilies: ipv6ipv4, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: v1.ClusterIPNone, }, }, expectedEndpointFamily: ipv6, }, { name: "v6 service, in a dual stack cluster", ipFamilies: ipv4ipv6, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "3000::1", }, }, expectedEndpointFamily: ipv6, }, { name: "v6 headless service, in a single stack cluster", ipFamilies: ipv6only, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: v1.ClusterIPNone, }, }, expectedEndpointFamily: ipv6, }, { name: "v6 headless service, in a dual stack cluster (connected to a new api-server)", ipFamilies: ipv4ipv6, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: v1.ClusterIPNone, IPFamilies: []v1.IPFamily{v1.IPv6Protocol}, // <- set by a api-server defaulting logic }, }, expectedEndpointFamily: ipv6, }, { name: "v6 legacy headless service, in a dual stack cluster (connected to a old api-server)", ipFamilies: ipv4ipv6, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: v1.ClusterIPNone, // <- families are not set by api-server }, }, expectedEndpointFamily: ipv4, }, // in reality this is a misconfigured cluster // i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6 // previously controller could assign wrong ip to endpoint address // with gate removed. this is no longer the case. this is *not* behavior change // because previously things would have failed in kube-proxy anyway (due to editing wrong iptables). { name: "v6 service, in a v4 only cluster.", ipFamilies: ipv4only, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "3000::1", }, }, expectError: true, expectedEndpointFamily: ipv4, }, // but this will actually give an error { name: "v6 service, in a v4 only cluster", ipFamilies: ipv4only, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "3000::1", }, }, expectError: true, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) ns := "test" addPods(podStore, ns, 1, 1, 0, tc.ipFamilies) pods := podStore.List() if len(pods) != 1 { t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods)) } pod := pods[0].(*v1.Pod) epa, err := podToEndpointAddressForService(&tc.service, pod) if err != nil && !tc.expectError { t.Fatalf("podToEndpointAddressForService returned unexpected error %v", err) } if err == nil && tc.expectError { t.Fatalf("podToEndpointAddressForService should have returned error but it did not") } if err != nil && tc.expectError { return } if utilnet.IsIPv6String(epa.IP) != (tc.expectedEndpointFamily == ipv6) { t.Fatalf("IP: expected %s, got: %s", tc.expectedEndpointFamily, epa.IP) } if *(epa.NodeName) != pod.Spec.NodeName { t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName)) } if epa.TargetRef.Kind != "Pod" { t.Fatalf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind) } if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace { t.Fatalf("TargetRef.Namespace: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace) } if epa.TargetRef.Name != pod.ObjectMeta.Name { t.Fatalf("TargetRef.Name: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name) } if epa.TargetRef.UID != pod.ObjectMeta.UID { t.Fatalf("TargetRef.UID: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID) } if epa.TargetRef.ResourceVersion != "" { t.Fatalf("TargetRef.ResourceVersion: expected empty, got: %s", epa.TargetRef.ResourceVersion) } }) } } func TestLastTriggerChangeTimeAnnotation(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Annotations: map[string]string{ v1.EndpointsLastChangeTriggerTime: triggerTimeString, }, Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Annotations: map[string]string{ v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString, }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Annotations: map[string]string{ v1.EndpointsLastChangeTriggerTime: triggerTimeString, }, Labels: map[string]string{ v1.IsHeadlessService: "", }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0*time.Second) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Annotations: map[string]string{ v1.EndpointsLastChangeTriggerTime: triggerTimeString, }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) // Neither pod nor service has trigger time, this should cause annotation to be cleared. addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}}, }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ v1.IsHeadlessService: "", }, // Annotation not set anymore. }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data) } // TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together. // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now. // TODO(mborsz): Migrate this test to mock clock when possible. func TestPodUpdatesBatching(t *testing.T) { type podUpdate struct { delay time.Duration podName string podIP string } tests := []struct { name string batchPeriod time.Duration podsCount int updates []podUpdate finalDelay time.Duration wantRequestCount int }{ { name: "three updates with no batching", batchPeriod: 0 * time.Second, podsCount: 10, updates: []podUpdate{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", podIP: "10.0.0.0", }, { delay: 100 * time.Millisecond, podName: "pod1", podIP: "10.0.0.1", }, { delay: 100 * time.Millisecond, podName: "pod2", podIP: "10.0.0.2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 3, }, { name: "three updates in one batch", batchPeriod: 1 * time.Second, podsCount: 10, updates: []podUpdate{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", podIP: "10.0.0.0", }, { delay: 100 * time.Millisecond, podName: "pod1", podIP: "10.0.0.1", }, { delay: 100 * time.Millisecond, podName: "pod2", podIP: "10.0.0.2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 1, }, { name: "three updates in two batches", batchPeriod: 1 * time.Second, podsCount: 10, updates: []podUpdate{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", podIP: "10.0.0.0", }, { delay: 100 * time.Millisecond, podName: "pod1", podIP: "10.0.0.1", }, { delay: 1 * time.Second, podName: "pod2", podIP: "10.0.0.2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 2, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { ns := "other" resourceVersion := 1 testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, tc.batchPeriod) endpoints.podsSynced = alwaysReady endpoints.servicesSynced = alwaysReady endpoints.endpointsSynced = alwaysReady endpoints.workerLoopPeriod = 10 * time.Millisecond go endpoints.Run(tCtx, 1) addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80}}, }, }) for _, update := range tc.updates { time.Sleep(update.delay) old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName)) if err != nil { t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err) } if !exists { t.Fatalf("Pod %q doesn't exist", update.podName) } oldPod := old.(*v1.Pod) newPod := oldPod.DeepCopy() newPod.Status.PodIP = update.podIP newPod.Status.PodIPs[0].IP = update.podIP newPod.ResourceVersion = strconv.Itoa(resourceVersion) resourceVersion++ endpoints.podStore.Update(newPod) endpoints.updatePod(oldPod, newPod) } time.Sleep(tc.finalDelay) endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount) }) } } // TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together. // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now. // TODO(mborsz): Migrate this test to mock clock when possible. func TestPodAddsBatching(t *testing.T) { type podAdd struct { delay time.Duration } tests := []struct { name string batchPeriod time.Duration adds []podAdd finalDelay time.Duration wantRequestCount int }{ { name: "three adds with no batching", batchPeriod: 0 * time.Second, adds: []podAdd{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, }, { delay: 100 * time.Millisecond, }, { delay: 100 * time.Millisecond, }, }, finalDelay: 3 * time.Second, wantRequestCount: 3, }, { name: "three adds in one batch", batchPeriod: 1 * time.Second, adds: []podAdd{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, }, { delay: 100 * time.Millisecond, }, { delay: 100 * time.Millisecond, }, }, finalDelay: 3 * time.Second, wantRequestCount: 1, }, { name: "three adds in two batches", batchPeriod: 1 * time.Second, adds: []podAdd{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, }, { delay: 100 * time.Millisecond, }, { delay: 1 * time.Second, }, }, finalDelay: 3 * time.Second, wantRequestCount: 2, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, tc.batchPeriod) endpoints.podsSynced = alwaysReady endpoints.servicesSynced = alwaysReady endpoints.endpointsSynced = alwaysReady endpoints.workerLoopPeriod = 10 * time.Millisecond go endpoints.Run(tCtx, 1) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80}}, }, }) for i, add := range tc.adds { time.Sleep(add.delay) p := testPod(ns, i, 1, true, ipv4only) endpoints.podStore.Add(p) endpoints.addPod(p) } time.Sleep(tc.finalDelay) endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount) }) } } // TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together. // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now. // TODO(mborsz): Migrate this test to mock clock when possible. func TestPodDeleteBatching(t *testing.T) { type podDelete struct { delay time.Duration podName string } tests := []struct { name string batchPeriod time.Duration podsCount int deletes []podDelete finalDelay time.Duration wantRequestCount int }{ { name: "three deletes with no batching", batchPeriod: 0 * time.Second, podsCount: 10, deletes: []podDelete{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", }, { delay: 100 * time.Millisecond, podName: "pod1", }, { delay: 100 * time.Millisecond, podName: "pod2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 3, }, { name: "three deletes in one batch", batchPeriod: 1 * time.Second, podsCount: 10, deletes: []podDelete{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", }, { delay: 100 * time.Millisecond, podName: "pod1", }, { delay: 100 * time.Millisecond, podName: "pod2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 1, }, { name: "three deletes in two batches", batchPeriod: 1 * time.Second, podsCount: 10, deletes: []podDelete{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", }, { delay: 100 * time.Millisecond, podName: "pod1", }, { delay: 1 * time.Second, podName: "pod2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 2, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, tc.batchPeriod) endpoints.podsSynced = alwaysReady endpoints.servicesSynced = alwaysReady endpoints.endpointsSynced = alwaysReady endpoints.workerLoopPeriod = 10 * time.Millisecond go endpoints.Run(tCtx, 1) addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80}}, }, }) for _, update := range tc.deletes { time.Sleep(update.delay) old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName)) if err != nil { t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err) } if !exists { t.Fatalf("Pod %q doesn't exist", update.podName) } endpoints.podStore.Delete(old) endpoints.deletePod(old) } time.Sleep(tc.finalDelay) endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount) }) } } func TestSyncEndpointsServiceNotFound(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) endpoints := newController(tCtx, testServer.URL, 0) endpoints.endpointsStore.Add(&v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "1", }, }) err := endpoints.syncService(tCtx, ns+"/foo") if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "DELETE", nil) } func TestSyncServiceOverCapacity(t *testing.T) { testCases := []struct { name string startingAnnotation *string numExisting int numDesired int numDesiredNotReady int numExpectedReady int numExpectedNotReady int expectedAnnotation bool }{{ name: "empty", startingAnnotation: nil, numExisting: 0, numDesired: 0, numExpectedReady: 0, numExpectedNotReady: 0, expectedAnnotation: false, }, { name: "annotation added past capacity, < than maxCapacity of Ready Addresses", startingAnnotation: nil, numExisting: maxCapacity - 1, numDesired: maxCapacity - 3, numDesiredNotReady: 4, numExpectedReady: maxCapacity - 3, numExpectedNotReady: 3, expectedAnnotation: true, }, { name: "annotation added past capacity, maxCapacity of Ready Addresses ", startingAnnotation: nil, numExisting: maxCapacity - 1, numDesired: maxCapacity, numDesiredNotReady: 10, numExpectedReady: maxCapacity, numExpectedNotReady: 0, expectedAnnotation: true, }, { name: "annotation removed below capacity", startingAnnotation: pointer.String("truncated"), numExisting: maxCapacity - 1, numDesired: maxCapacity - 1, numDesiredNotReady: 0, numExpectedReady: maxCapacity - 1, numExpectedNotReady: 0, expectedAnnotation: false, }, { name: "annotation was set to warning previously, annotation removed at capacity", startingAnnotation: pointer.String("warning"), numExisting: maxCapacity, numDesired: maxCapacity, numDesiredNotReady: 0, numExpectedReady: maxCapacity, numExpectedNotReady: 0, expectedAnnotation: false, }, { name: "annotation was set to warning previously but still over capacity", startingAnnotation: pointer.String("warning"), numExisting: maxCapacity + 1, numDesired: maxCapacity + 1, numDesiredNotReady: 0, numExpectedReady: maxCapacity, numExpectedNotReady: 0, expectedAnnotation: true, }, { name: "annotation removed at capacity", startingAnnotation: pointer.String("truncated"), numExisting: maxCapacity, numDesired: maxCapacity, numDesiredNotReady: 0, numExpectedReady: maxCapacity, numExpectedNotReady: 0, expectedAnnotation: false, }, { name: "no endpoints change, annotation value corrected", startingAnnotation: pointer.String("invalid"), numExisting: maxCapacity + 1, numDesired: maxCapacity + 1, numDesiredNotReady: 0, numExpectedReady: maxCapacity, numExpectedNotReady: 0, expectedAnnotation: true, }} for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { tCtx := ktesting.Init(t) ns := "test" client, c := newFakeController(tCtx, 0*time.Second) addPods(c.podStore, ns, tc.numDesired, 1, tc.numDesiredNotReady, ipv4only) pods := c.podStore.List() svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{Port: 80}}, }, } c.serviceStore.Add(svc) subset := v1.EndpointSubset{} for i := 0; i < tc.numExisting; i++ { pod := pods[i].(*v1.Pod) epa, _ := podToEndpointAddressForService(svc, pod) subset.Addresses = append(subset.Addresses, *epa) } endpoints := &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: svc.Name, Namespace: ns, ResourceVersion: "1", Annotations: map[string]string{}, }, Subsets: []v1.EndpointSubset{subset}, } if tc.startingAnnotation != nil { endpoints.Annotations[v1.EndpointsOverCapacity] = *tc.startingAnnotation } c.endpointsStore.Add(endpoints) _, err := client.CoreV1().Endpoints(ns).Create(tCtx, endpoints, metav1.CreateOptions{}) if err != nil { t.Fatalf("unexpected error creating endpoints: %v", err) } err = c.syncService(tCtx, fmt.Sprintf("%s/%s", ns, svc.Name)) if err != nil { t.Errorf("Unexpected error syncing service %v", err) } actualEndpoints, err := client.CoreV1().Endpoints(ns).Get(tCtx, endpoints.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("unexpected error getting endpoints: %v", err) } actualAnnotation, ok := actualEndpoints.Annotations[v1.EndpointsOverCapacity] if tc.expectedAnnotation { if !ok { t.Errorf("Expected EndpointsOverCapacity annotation to be set") } else if actualAnnotation != "truncated" { t.Errorf("Expected EndpointsOverCapacity annotation to be 'truncated', got %s", actualAnnotation) } } else { if ok { t.Errorf("Expected EndpointsOverCapacity annotation not to be set, got %s", actualAnnotation) } } numActualReady := 0 numActualNotReady := 0 for _, subset := range actualEndpoints.Subsets { numActualReady += len(subset.Addresses) numActualNotReady += len(subset.NotReadyAddresses) } if numActualReady != tc.numExpectedReady { t.Errorf("Unexpected number of actual ready Endpoints: got %d endpoints, want %d endpoints", numActualReady, tc.numExpectedReady) } if numActualNotReady != tc.numExpectedNotReady { t.Errorf("Unexpected number of actual not ready Endpoints: got %d endpoints, want %d endpoints", numActualNotReady, tc.numExpectedNotReady) } }) } } func TestTruncateEndpoints(t *testing.T) { testCases := []struct { desc string // subsetsReady, subsetsNotReady, expectedReady, expectedNotReady // must all be the same length subsetsReady []int subsetsNotReady []int expectedReady []int expectedNotReady []int }{{ desc: "empty", subsetsReady: []int{}, subsetsNotReady: []int{}, expectedReady: []int{}, expectedNotReady: []int{}, }, { desc: "total endpoints < max capacity", subsetsReady: []int{50, 100, 100, 100, 100}, subsetsNotReady: []int{50, 100, 100, 100, 100}, expectedReady: []int{50, 100, 100, 100, 100}, expectedNotReady: []int{50, 100, 100, 100, 100}, }, { desc: "total endpoints = max capacity", subsetsReady: []int{100, 100, 100, 100, 100}, subsetsNotReady: []int{100, 100, 100, 100, 100}, expectedReady: []int{100, 100, 100, 100, 100}, expectedNotReady: []int{100, 100, 100, 100, 100}, }, { desc: "total ready endpoints < max capacity, but total endpoints > max capacity", subsetsReady: []int{90, 110, 50, 10, 20}, subsetsNotReady: []int{101, 200, 200, 201, 298}, expectedReady: []int{90, 110, 50, 10, 20}, expectedNotReady: []int{73, 144, 144, 145, 214}, }, { desc: "total ready endpoints > max capacity", subsetsReady: []int{205, 400, 402, 400, 693}, subsetsNotReady: []int{100, 200, 200, 200, 300}, expectedReady: []int{98, 191, 192, 191, 328}, expectedNotReady: []int{0, 0, 0, 0, 0}, }} for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { var subsets []v1.EndpointSubset for subsetIndex, numReady := range tc.subsetsReady { subset := v1.EndpointSubset{} for i := 0; i < numReady; i++ { subset.Addresses = append(subset.Addresses, v1.EndpointAddress{}) } numNotReady := tc.subsetsNotReady[subsetIndex] for i := 0; i < numNotReady; i++ { subset.NotReadyAddresses = append(subset.NotReadyAddresses, v1.EndpointAddress{}) } subsets = append(subsets, subset) } endpoints := &v1.Endpoints{Subsets: subsets} truncateEndpoints(endpoints) for i, subset := range endpoints.Subsets { if len(subset.Addresses) != tc.expectedReady[i] { t.Errorf("Unexpected number of actual ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.Addresses), tc.expectedReady[i]) } if len(subset.NotReadyAddresses) != tc.expectedNotReady[i] { t.Errorf("Unexpected number of actual not ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.NotReadyAddresses), tc.expectedNotReady[i]) } } }) } } func TestEndpointPortFromServicePort(t *testing.T) { http := pointer.String("http") testCases := map[string]struct { serviceAppProtocol *string expectedEndpointsAppProtocol *string }{ "empty app protocol": { serviceAppProtocol: nil, expectedEndpointsAppProtocol: nil, }, "http app protocol": { serviceAppProtocol: http, expectedEndpointsAppProtocol: http, }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { epp := endpointPortFromServicePort(&v1.ServicePort{Name: "test", AppProtocol: tc.serviceAppProtocol}, 80) if epp.AppProtocol != tc.expectedEndpointsAppProtocol { t.Errorf("Expected Endpoints AppProtocol to be %s, got %s", stringVal(tc.expectedEndpointsAppProtocol), stringVal(epp.AppProtocol)) } }) } } // TestMultipleServiceChanges tests that endpoints that are not created because of an out of sync endpoints cache are eventually recreated // A service will be created. After the endpoints exist, the service will be deleted and the endpoints will not be deleted from the cache immediately. // After the service is recreated, the endpoints will be deleted replicating an out of sync cache. Expect that eventually the endpoints will be recreated. func TestMultipleServiceChanges(t *testing.T) { ns := metav1.NamespaceDefault expectedSubsets := []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, }, }} endpoint := &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"}, Subsets: expectedSubsets, } controller := &endpointController{} blockDelete := make(chan struct{}) blockNextAction := make(chan struct{}) stopChan := make(chan struct{}) testServer := makeBlockingEndpointDeleteTestServer(t, controller, endpoint, blockDelete, blockNextAction, ns) defer testServer.Close() tCtx := ktesting.Init(t) *controller = *newController(tCtx, testServer.URL, 0*time.Second) addPods(controller.podStore, ns, 1, 1, 0, ipv4only) go func() { controller.Run(tCtx, 1) }() svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, ClusterIP: "None", Ports: nil, }, } controller.serviceStore.Add(svc) controller.onServiceUpdate(svc) // blockNextAction should eventually unblock once server gets endpoint request. waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Add should have caused a request to be sent to the test server") controller.serviceStore.Delete(svc) controller.onServiceDelete(svc) waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Delete should have caused a request to be sent to the test server") // If endpoints cache has not updated before service update is registered // Services add will not trigger a Create endpoint request. controller.serviceStore.Add(svc) controller.onServiceUpdate(svc) // Ensure the work queue has been processed by looping for up to a second to prevent flakes. wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) { return controller.queue.Len() == 0, nil }) // Cause test server to delete endpoints close(blockDelete) waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoint should have been recreated") close(blockNextAction) close(stopChan) } func TestSyncServiceAddresses(t *testing.T) { makeService := func(tolerateUnready bool) *v1.Service { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, PublishNotReadyAddresses: tolerateUnready, Type: v1.ServiceTypeClusterIP, ClusterIP: "1.1.1.1", Ports: []v1.ServicePort{{Port: 80}}, }, } } makePod := func(phase v1.PodPhase, isReady bool, terminating bool) *v1.Pod { statusCondition := v1.ConditionFalse if isReady { statusCondition = v1.ConditionTrue } now := metav1.Now() deletionTimestamp := &now if !terminating { deletionTimestamp = nil } return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: "ns", Name: "fakepod", DeletionTimestamp: deletionTimestamp, Labels: map[string]string{"foo": "bar"}, }, Spec: v1.PodSpec{ Containers: []v1.Container{{Ports: []v1.ContainerPort{ {Name: "port1", ContainerPort: int32(8080)}, }}}, }, Status: v1.PodStatus{ Phase: phase, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: statusCondition, }, }, PodIP: "10.1.1.1", PodIPs: []v1.PodIP{ {IP: "10.1.1.1"}, }, }, } } testCases := []struct { name string pod *v1.Pod service *v1.Service expectedReady int expectedUnready int }{ { name: "pod running phase", pod: makePod(v1.PodRunning, true, false), service: makeService(false), expectedReady: 1, expectedUnready: 0, }, { name: "pod running phase being deleted", pod: makePod(v1.PodRunning, true, true), service: makeService(false), expectedReady: 0, expectedUnready: 0, }, { name: "pod unknown phase container ready", pod: makePod(v1.PodUnknown, true, false), service: makeService(false), expectedReady: 1, expectedUnready: 0, }, { name: "pod unknown phase container ready being deleted", pod: makePod(v1.PodUnknown, true, true), service: makeService(false), expectedReady: 0, expectedUnready: 0, }, { name: "pod pending phase container ready", pod: makePod(v1.PodPending, true, false), service: makeService(false), expectedReady: 1, expectedUnready: 0, }, { name: "pod pending phase container ready being deleted", pod: makePod(v1.PodPending, true, true), service: makeService(false), expectedReady: 0, expectedUnready: 0, }, { name: "pod unknown phase container not ready", pod: makePod(v1.PodUnknown, false, false), service: makeService(false), expectedReady: 0, expectedUnready: 1, }, { name: "pod pending phase container not ready", pod: makePod(v1.PodPending, false, false), service: makeService(false), expectedReady: 0, expectedUnready: 1, }, { name: "pod failed phase", pod: makePod(v1.PodFailed, false, false), service: makeService(false), expectedReady: 0, expectedUnready: 0, }, { name: "pod succeeded phase", pod: makePod(v1.PodSucceeded, false, false), service: makeService(false), expectedReady: 0, expectedUnready: 0, }, { name: "pod running phase and tolerate unready", pod: makePod(v1.PodRunning, false, false), service: makeService(true), expectedReady: 1, expectedUnready: 0, }, { name: "pod running phase and tolerate unready being deleted", pod: makePod(v1.PodRunning, false, true), service: makeService(true), expectedReady: 1, expectedUnready: 0, }, { name: "pod unknown phase and tolerate unready", pod: makePod(v1.PodUnknown, false, false), service: makeService(true), expectedReady: 1, expectedUnready: 0, }, { name: "pod unknown phase and tolerate unready being deleted", pod: makePod(v1.PodUnknown, false, true), service: makeService(true), expectedReady: 1, expectedUnready: 0, }, { name: "pod pending phase and tolerate unready", pod: makePod(v1.PodPending, false, false), service: makeService(true), expectedReady: 1, expectedUnready: 0, }, { name: "pod pending phase and tolerate unready being deleted", pod: makePod(v1.PodPending, false, true), service: makeService(true), expectedReady: 1, expectedUnready: 0, }, { name: "pod failed phase and tolerate unready", pod: makePod(v1.PodFailed, false, false), service: makeService(true), expectedReady: 0, expectedUnready: 0, }, { name: "pod succeeded phase and tolerate unready endpoints", pod: makePod(v1.PodSucceeded, false, false), service: makeService(true), expectedReady: 0, expectedUnready: 0, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { tCtx := ktesting.Init(t) ns := tc.service.Namespace client, c := newFakeController(tCtx, 0*time.Second) err := c.podStore.Add(tc.pod) if err != nil { t.Errorf("Unexpected error adding pod %v", err) } err = c.serviceStore.Add(tc.service) if err != nil { t.Errorf("Unexpected error adding service %v", err) } err = c.syncService(tCtx, fmt.Sprintf("%s/%s", ns, tc.service.Name)) if err != nil { t.Errorf("Unexpected error syncing service %v", err) } endpoints, err := client.CoreV1().Endpoints(ns).Get(tCtx, tc.service.Name, metav1.GetOptions{}) if err != nil { t.Errorf("Unexpected error %v", err) } readyEndpoints := 0 unreadyEndpoints := 0 for _, subset := range endpoints.Subsets { readyEndpoints += len(subset.Addresses) unreadyEndpoints += len(subset.NotReadyAddresses) } if tc.expectedReady != readyEndpoints { t.Errorf("Expected %d ready endpoints, got %d", tc.expectedReady, readyEndpoints) } if tc.expectedUnready != unreadyEndpoints { t.Errorf("Expected %d ready endpoints, got %d", tc.expectedUnready, unreadyEndpoints) } }) } } func TestEndpointsDeletionEvents(t *testing.T) { ns := metav1.NamespaceDefault testServer, _ := makeTestServer(t, ns) defer testServer.Close() tCtx := ktesting.Init(t) controller := newController(tCtx, testServer.URL, 0) store := controller.endpointsStore ep1 := &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, ResourceVersion: "rv1", }, } // Test Unexpected and Expected Deletes store.Delete(ep1) controller.onEndpointsDelete(ep1) if controller.queue.Len() != 1 { t.Errorf("Expected one service to be in the queue, found %d", controller.queue.Len()) } } func stringVal(str *string) string { if str == nil { return "nil" } return *str } // waitForChanReceive blocks up to the timeout waiting for the receivingChan to receive func waitForChanReceive(t *testing.T, timeout time.Duration, receivingChan chan struct{}, errorMsg string) { timer := time.NewTimer(timeout) select { case <-timer.C: t.Errorf(errorMsg) case <-receivingChan: } } func TestEndpointSubsetsEqualIgnoreResourceVersion(t *testing.T) { copyAndMutateEndpointSubset := func(orig *v1.EndpointSubset, mutator func(*v1.EndpointSubset)) *v1.EndpointSubset { newSubSet := orig.DeepCopy() mutator(newSubSet) return newSubSet } es1 := &v1.EndpointSubset{ Addresses: []v1.EndpointAddress{ { IP: "1.1.1.1", TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-1", Namespace: "ns", ResourceVersion: "1"}, }, }, NotReadyAddresses: []v1.EndpointAddress{ { IP: "1.1.1.2", TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-2", Namespace: "ns2", ResourceVersion: "2"}, }, }, Ports: []v1.EndpointPort{{Port: 8081, Protocol: "TCP"}}, } es2 := &v1.EndpointSubset{ Addresses: []v1.EndpointAddress{ { IP: "2.2.2.1", TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-1", Namespace: "ns", ResourceVersion: "3"}, }, }, NotReadyAddresses: []v1.EndpointAddress{ { IP: "2.2.2.2", TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-2", Namespace: "ns2", ResourceVersion: "4"}, }, }, Ports: []v1.EndpointPort{{Port: 8082, Protocol: "TCP"}}, } tests := []struct { name string subsets1 []v1.EndpointSubset subsets2 []v1.EndpointSubset expected bool }{ { name: "Subsets removed", subsets1: []v1.EndpointSubset{*es1, *es2}, subsets2: []v1.EndpointSubset{*es1}, expected: false, }, { name: "Ready Pod IP changed", subsets1: []v1.EndpointSubset{*es1, *es2}, subsets2: []v1.EndpointSubset{*copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) { es.Addresses[0].IP = "1.1.1.10" }), *es2}, expected: false, }, { name: "NotReady Pod IP changed", subsets1: []v1.EndpointSubset{*es1, *es2}, subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) { es.NotReadyAddresses[0].IP = "2.2.2.10" })}, expected: false, }, { name: "Pod ResourceVersion changed", subsets1: []v1.EndpointSubset{*es1, *es2}, subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) { es.Addresses[0].TargetRef.ResourceVersion = "100" })}, expected: true, }, { name: "Pod ResourceVersion removed", subsets1: []v1.EndpointSubset{*es1, *es2}, subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) { es.Addresses[0].TargetRef.ResourceVersion = "" })}, expected: true, }, { name: "Ports changed", subsets1: []v1.EndpointSubset{*es1, *es2}, subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) { es.Ports[0].Port = 8082 })}, expected: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if got := endpointSubsetsEqualIgnoreResourceVersion(tt.subsets1, tt.subsets2); got != tt.expected { t.Errorf("semanticIgnoreResourceVersion.DeepEqual() = %v, expected %v", got, tt.expected) } }) } }