...

Source file src/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go

Documentation: k8s.io/kubernetes/pkg/controller/endpoint

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package endpoint
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net/http"
    23  	"net/http/httptest"
    24  	"reflect"
    25  	"strconv"
    26  	"testing"
    27  	"time"
    28  
    29  	"github.com/google/go-cmp/cmp"
    30  	v1 "k8s.io/api/core/v1"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/runtime"
    33  	"k8s.io/apimachinery/pkg/runtime/schema"
    34  	"k8s.io/apimachinery/pkg/util/intstr"
    35  	"k8s.io/apimachinery/pkg/util/wait"
    36  	"k8s.io/client-go/informers"
    37  	clientset "k8s.io/client-go/kubernetes"
    38  	"k8s.io/client-go/kubernetes/fake"
    39  	clientscheme "k8s.io/client-go/kubernetes/scheme"
    40  	restclient "k8s.io/client-go/rest"
    41  	"k8s.io/client-go/tools/cache"
    42  	utiltesting "k8s.io/client-go/util/testing"
    43  	endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
    44  	api "k8s.io/kubernetes/pkg/apis/core"
    45  	controllerpkg "k8s.io/kubernetes/pkg/controller"
    46  	"k8s.io/kubernetes/test/utils/ktesting"
    47  	utilnet "k8s.io/utils/net"
    48  	"k8s.io/utils/pointer"
    49  )
    50  
    51  var alwaysReady = func() bool { return true }
    52  var neverReady = func() bool { return false }
    53  var emptyNodeName string
    54  var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
    55  var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
    56  var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
    57  
    58  var ipv4only = []v1.IPFamily{v1.IPv4Protocol}
    59  var ipv6only = []v1.IPFamily{v1.IPv6Protocol}
    60  var ipv4ipv6 = []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol}
    61  var ipv6ipv4 = []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol}
    62  
    63  func testPod(namespace string, id int, nPorts int, isReady bool, ipFamilies []v1.IPFamily) *v1.Pod {
    64  	p := &v1.Pod{
    65  		TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
    66  		ObjectMeta: metav1.ObjectMeta{
    67  			Namespace:       namespace,
    68  			Name:            fmt.Sprintf("pod%d", id),
    69  			Labels:          map[string]string{"foo": "bar"},
    70  			ResourceVersion: fmt.Sprint(id),
    71  		},
    72  		Spec: v1.PodSpec{
    73  			Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
    74  		},
    75  		Status: v1.PodStatus{
    76  			Conditions: []v1.PodCondition{
    77  				{
    78  					Type:   v1.PodReady,
    79  					Status: v1.ConditionTrue,
    80  				},
    81  			},
    82  		},
    83  	}
    84  	if !isReady {
    85  		p.Status.Conditions[0].Status = v1.ConditionFalse
    86  	}
    87  	for j := 0; j < nPorts; j++ {
    88  		p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
    89  			v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
    90  	}
    91  	for _, family := range ipFamilies {
    92  		var ip string
    93  		if family == v1.IPv4Protocol {
    94  			ip = fmt.Sprintf("1.2.3.%d", 4+id)
    95  		} else {
    96  			ip = fmt.Sprintf("2000::%d", 4+id)
    97  		}
    98  		p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: ip})
    99  	}
   100  	p.Status.PodIP = p.Status.PodIPs[0].IP
   101  
   102  	return p
   103  }
   104  
   105  func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, ipFamilies []v1.IPFamily) {
   106  	for i := 0; i < nPods+nNotReady; i++ {
   107  		isReady := i < nPods
   108  		pod := testPod(namespace, i, nPorts, isReady, ipFamilies)
   109  		store.Add(pod)
   110  	}
   111  }
   112  
   113  func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) {
   114  	for i := 0; i < nPods; i++ {
   115  		p := &v1.Pod{
   116  			TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
   117  			ObjectMeta: metav1.ObjectMeta{
   118  				Namespace: namespace,
   119  				Name:      fmt.Sprintf("pod%d", i),
   120  				Labels:    map[string]string{"foo": "bar"},
   121  			},
   122  			Spec: v1.PodSpec{
   123  				RestartPolicy: restartPolicy,
   124  				Containers:    []v1.Container{{Ports: []v1.ContainerPort{}}},
   125  			},
   126  			Status: v1.PodStatus{
   127  				PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
   128  				Phase: podPhase,
   129  				Conditions: []v1.PodCondition{
   130  					{
   131  						Type:   v1.PodReady,
   132  						Status: v1.ConditionFalse,
   133  					},
   134  				},
   135  			},
   136  		}
   137  		for j := 0; j < nPorts; j++ {
   138  			p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
   139  				v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
   140  		}
   141  		store.Add(p)
   142  	}
   143  }
   144  
   145  func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) {
   146  	fakeEndpointsHandler := utiltesting.FakeHandler{
   147  		StatusCode:   http.StatusOK,
   148  		ResponseBody: runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{}),
   149  	}
   150  	mux := http.NewServeMux()
   151  	if namespace == "" {
   152  		t.Fatal("namespace cannot be empty")
   153  	}
   154  	mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints", &fakeEndpointsHandler)
   155  	mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints/", &fakeEndpointsHandler)
   156  	mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
   157  		t.Errorf("unexpected request: %v", req.RequestURI)
   158  		http.Error(res, "", http.StatusNotFound)
   159  	})
   160  	return httptest.NewServer(mux), &fakeEndpointsHandler
   161  }
   162  
   163  // makeBlockingEndpointDeleteTestServer will signal the blockNextAction channel on endpoint "POST" & "DELETE" requests. All
   164  // block endpoint "DELETE" requestsi will wait on a blockDelete signal to delete endpoint. If controller is nil, a error will
   165  // be sent in the response.
   166  func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
   167  
   168  	handlerFunc := func(res http.ResponseWriter, req *http.Request) {
   169  		if controller == nil {
   170  			res.WriteHeader(http.StatusInternalServerError)
   171  			res.Write([]byte("controller has not been set yet"))
   172  			return
   173  		}
   174  
   175  		if req.Method == "POST" {
   176  			controller.endpointsStore.Add(endpoint)
   177  			blockNextAction <- struct{}{}
   178  		}
   179  
   180  		if req.Method == "DELETE" {
   181  			go func() {
   182  				// Delay the deletion of endoints to make endpoint cache out of sync
   183  				<-blockDelete
   184  				controller.endpointsStore.Delete(endpoint)
   185  				controller.onEndpointsDelete(endpoint)
   186  			}()
   187  			blockNextAction <- struct{}{}
   188  		}
   189  
   190  		res.WriteHeader(http.StatusOK)
   191  		res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{})))
   192  	}
   193  
   194  	mux := http.NewServeMux()
   195  	mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints", handlerFunc)
   196  	mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints/", handlerFunc)
   197  	mux.HandleFunc("/api/v1/namespaces/"+namespace+"/events", func(res http.ResponseWriter, req *http.Request) {})
   198  	mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
   199  		t.Errorf("unexpected request: %v", req.RequestURI)
   200  		http.Error(res, "", http.StatusNotFound)
   201  	})
   202  	return httptest.NewServer(mux)
   203  
   204  }
   205  
   206  type endpointController struct {
   207  	*Controller
   208  	podStore       cache.Store
   209  	serviceStore   cache.Store
   210  	endpointsStore cache.Store
   211  }
   212  
   213  func newController(ctx context.Context, url string, batchPeriod time.Duration) *endpointController {
   214  	client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
   215  	informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc())
   216  	endpoints := NewEndpointController(ctx, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(),
   217  		informerFactory.Core().V1().Endpoints(), client, batchPeriod)
   218  	endpoints.podsSynced = alwaysReady
   219  	endpoints.servicesSynced = alwaysReady
   220  	endpoints.endpointsSynced = alwaysReady
   221  	return &endpointController{
   222  		endpoints,
   223  		informerFactory.Core().V1().Pods().Informer().GetStore(),
   224  		informerFactory.Core().V1().Services().Informer().GetStore(),
   225  		informerFactory.Core().V1().Endpoints().Informer().GetStore(),
   226  	}
   227  }
   228  
   229  func newFakeController(ctx context.Context, batchPeriod time.Duration) (*fake.Clientset, *endpointController) {
   230  	client := fake.NewSimpleClientset()
   231  	informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc())
   232  
   233  	eController := NewEndpointController(
   234  		ctx,
   235  		informerFactory.Core().V1().Pods(),
   236  		informerFactory.Core().V1().Services(),
   237  		informerFactory.Core().V1().Endpoints(),
   238  		client,
   239  		batchPeriod)
   240  
   241  	eController.podsSynced = alwaysReady
   242  	eController.servicesSynced = alwaysReady
   243  	eController.endpointsSynced = alwaysReady
   244  
   245  	return client, &endpointController{
   246  		eController,
   247  		informerFactory.Core().V1().Pods().Informer().GetStore(),
   248  		informerFactory.Core().V1().Services().Informer().GetStore(),
   249  		informerFactory.Core().V1().Endpoints().Informer().GetStore(),
   250  	}
   251  }
   252  
   253  func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
   254  	ns := metav1.NamespaceDefault
   255  	testServer, endpointsHandler := makeTestServer(t, ns)
   256  	defer testServer.Close()
   257  
   258  	tCtx := ktesting.Init(t)
   259  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   260  	endpoints.endpointsStore.Add(&v1.Endpoints{
   261  		ObjectMeta: metav1.ObjectMeta{
   262  			Name:            "foo",
   263  			Namespace:       ns,
   264  			ResourceVersion: "1",
   265  		},
   266  		Subsets: []v1.EndpointSubset{{
   267  			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
   268  			Ports:     []v1.EndpointPort{{Port: 1000}},
   269  		}},
   270  	})
   271  	endpoints.serviceStore.Add(&v1.Service{
   272  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   273  		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}},
   274  	})
   275  	err := endpoints.syncService(tCtx, ns+"/foo")
   276  	if err != nil {
   277  		t.Errorf("Unexpected error syncing service %v", err)
   278  	}
   279  	endpointsHandler.ValidateRequestCount(t, 0)
   280  }
   281  
   282  func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
   283  	ns := metav1.NamespaceDefault
   284  	testServer, endpointsHandler := makeTestServer(t, ns)
   285  	defer testServer.Close()
   286  
   287  	tCtx := ktesting.Init(t)
   288  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   289  	endpoints.endpointsStore.Add(&v1.Endpoints{
   290  		ObjectMeta: metav1.ObjectMeta{
   291  			Name:            "foo",
   292  			Namespace:       ns,
   293  			ResourceVersion: "1",
   294  		},
   295  		Subsets: nil,
   296  	})
   297  	endpoints.serviceStore.Add(&v1.Service{
   298  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   299  		Spec: v1.ServiceSpec{
   300  			Selector: map[string]string{"foo": "bar"},
   301  			Ports:    []v1.ServicePort{{Port: 80}},
   302  		},
   303  	})
   304  	err := endpoints.syncService(tCtx, ns+"/foo")
   305  	if err != nil {
   306  		t.Errorf("Unexpected error syncing service %v", err)
   307  	}
   308  	endpointsHandler.ValidateRequestCount(t, 0)
   309  }
   310  
   311  func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
   312  	ns := metav1.NamespaceDefault
   313  	testServer, endpointsHandler := makeTestServer(t, ns)
   314  	defer testServer.Close()
   315  
   316  	tCtx := ktesting.Init(t)
   317  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   318  	endpoints.endpointsStore.Add(&v1.Endpoints{
   319  		ObjectMeta: metav1.ObjectMeta{
   320  			Name:            "foo",
   321  			Namespace:       ns,
   322  			ResourceVersion: "1",
   323  		},
   324  		Subsets: []v1.EndpointSubset{},
   325  	})
   326  	endpoints.serviceStore.Add(&v1.Service{
   327  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   328  		Spec: v1.ServiceSpec{
   329  			Selector: map[string]string{"foo": "bar"},
   330  			Ports:    []v1.ServicePort{{Port: 80}},
   331  		},
   332  	})
   333  	err := endpoints.syncService(tCtx, ns+"/foo")
   334  	if err != nil {
   335  		t.Errorf("Unexpected error syncing service %v", err)
   336  	}
   337  	endpointsHandler.ValidateRequestCount(t, 0)
   338  }
   339  
   340  func TestSyncEndpointsWithPodResourceVersionUpdateOnly(t *testing.T) {
   341  	ns := metav1.NamespaceDefault
   342  	testServer, endpointsHandler := makeTestServer(t, ns)
   343  	defer testServer.Close()
   344  	pod0 := testPod(ns, 0, 1, true, ipv4only)
   345  	pod1 := testPod(ns, 1, 1, false, ipv4only)
   346  	tCtx := ktesting.Init(t)
   347  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   348  	endpoints.endpointsStore.Add(&v1.Endpoints{
   349  		ObjectMeta: metav1.ObjectMeta{
   350  			Name:            "foo",
   351  			Namespace:       ns,
   352  			ResourceVersion: "1",
   353  		},
   354  		Subsets: []v1.EndpointSubset{{
   355  			Addresses: []v1.EndpointAddress{
   356  				{
   357  					IP:        pod0.Status.PodIPs[0].IP,
   358  					NodeName:  &emptyNodeName,
   359  					TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod0.Name, Namespace: ns, ResourceVersion: "1"},
   360  				},
   361  			},
   362  			NotReadyAddresses: []v1.EndpointAddress{
   363  				{
   364  					IP:        pod1.Status.PodIPs[0].IP,
   365  					NodeName:  &emptyNodeName,
   366  					TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod1.Name, Namespace: ns, ResourceVersion: "2"},
   367  				},
   368  			},
   369  			Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
   370  		}},
   371  	})
   372  	endpoints.serviceStore.Add(&v1.Service{
   373  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   374  		Spec: v1.ServiceSpec{
   375  			Selector: map[string]string{"foo": "bar"},
   376  			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
   377  		},
   378  	})
   379  	pod0.ResourceVersion = "3"
   380  	pod1.ResourceVersion = "4"
   381  	endpoints.podStore.Add(pod0)
   382  	endpoints.podStore.Add(pod1)
   383  	err := endpoints.syncService(tCtx, ns+"/foo")
   384  	if err != nil {
   385  		t.Errorf("Unexpected error syncing service %v", err)
   386  	}
   387  
   388  	endpointsHandler.ValidateRequestCount(t, 0)
   389  }
   390  
   391  func TestSyncEndpointsNewNoSubsets(t *testing.T) {
   392  	ns := metav1.NamespaceDefault
   393  	testServer, endpointsHandler := makeTestServer(t, ns)
   394  	defer testServer.Close()
   395  	tCtx := ktesting.Init(t)
   396  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   397  	endpoints.serviceStore.Add(&v1.Service{
   398  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   399  		Spec: v1.ServiceSpec{
   400  			Selector: map[string]string{"foo": "bar"},
   401  			Ports:    []v1.ServicePort{{Port: 80}},
   402  		},
   403  	})
   404  	err := endpoints.syncService(tCtx, ns+"/foo")
   405  	if err != nil {
   406  		t.Errorf("Unexpected error syncing service %v", err)
   407  	}
   408  	endpointsHandler.ValidateRequestCount(t, 1)
   409  }
   410  
   411  func TestCheckLeftoverEndpoints(t *testing.T) {
   412  	ns := metav1.NamespaceDefault
   413  	testServer, _ := makeTestServer(t, ns)
   414  	defer testServer.Close()
   415  
   416  	tCtx := ktesting.Init(t)
   417  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   418  	endpoints.endpointsStore.Add(&v1.Endpoints{
   419  		ObjectMeta: metav1.ObjectMeta{
   420  			Name:            "foo",
   421  			Namespace:       ns,
   422  			ResourceVersion: "1",
   423  		},
   424  		Subsets: []v1.EndpointSubset{{
   425  			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
   426  			Ports:     []v1.EndpointPort{{Port: 1000}},
   427  		}},
   428  	})
   429  	endpoints.checkLeftoverEndpoints()
   430  	if e, a := 1, endpoints.queue.Len(); e != a {
   431  		t.Fatalf("Expected %v, got %v", e, a)
   432  	}
   433  	got, _ := endpoints.queue.Get()
   434  	if e, a := ns+"/foo", got; e != a {
   435  		t.Errorf("Expected %v, got %v", e, a)
   436  	}
   437  }
   438  
   439  func TestSyncEndpointsProtocolTCP(t *testing.T) {
   440  	ns := "other"
   441  	testServer, endpointsHandler := makeTestServer(t, ns)
   442  	defer testServer.Close()
   443  	tCtx := ktesting.Init(t)
   444  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   445  	endpoints.endpointsStore.Add(&v1.Endpoints{
   446  		ObjectMeta: metav1.ObjectMeta{
   447  			Name:            "foo",
   448  			Namespace:       ns,
   449  			ResourceVersion: "1",
   450  		},
   451  		Subsets: []v1.EndpointSubset{{
   452  			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
   453  			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
   454  		}},
   455  	})
   456  	addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
   457  	endpoints.serviceStore.Add(&v1.Service{
   458  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   459  		Spec: v1.ServiceSpec{
   460  			Selector: map[string]string{},
   461  			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
   462  		},
   463  	})
   464  	err := endpoints.syncService(tCtx, ns+"/foo")
   465  	if err != nil {
   466  		t.Errorf("Unexpected error syncing service %v", err)
   467  	}
   468  
   469  	endpointsHandler.ValidateRequestCount(t, 1)
   470  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
   471  		ObjectMeta: metav1.ObjectMeta{
   472  			Name:            "foo",
   473  			Namespace:       ns,
   474  			ResourceVersion: "1",
   475  			Labels: map[string]string{
   476  				v1.IsHeadlessService: "",
   477  			},
   478  		},
   479  		Subsets: []v1.EndpointSubset{{
   480  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
   481  			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
   482  		}},
   483  	})
   484  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
   485  }
   486  
   487  func TestSyncEndpointsHeadlessServiceLabel(t *testing.T) {
   488  	ns := metav1.NamespaceDefault
   489  	testServer, endpointsHandler := makeTestServer(t, ns)
   490  	defer testServer.Close()
   491  	tCtx := ktesting.Init(t)
   492  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   493  	endpoints.endpointsStore.Add(&v1.Endpoints{
   494  		ObjectMeta: metav1.ObjectMeta{
   495  			Name:            "foo",
   496  			Namespace:       ns,
   497  			ResourceVersion: "1",
   498  			Labels: map[string]string{
   499  				v1.IsHeadlessService: "",
   500  			},
   501  		},
   502  		Subsets: []v1.EndpointSubset{},
   503  	})
   504  	endpoints.serviceStore.Add(&v1.Service{
   505  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   506  		Spec: v1.ServiceSpec{
   507  			Selector: map[string]string{"foo": "bar"},
   508  			Ports:    []v1.ServicePort{{Port: 80}},
   509  		},
   510  	})
   511  	err := endpoints.syncService(tCtx, ns+"/foo")
   512  	if err != nil {
   513  		t.Errorf("Unexpected error syncing service %v", err)
   514  	}
   515  
   516  	endpointsHandler.ValidateRequestCount(t, 0)
   517  }
   518  
   519  func TestSyncServiceExternalNameType(t *testing.T) {
   520  	serviceName := "testing-1"
   521  	namespace := metav1.NamespaceDefault
   522  
   523  	testCases := []struct {
   524  		desc    string
   525  		service *v1.Service
   526  	}{
   527  		{
   528  			desc: "External name with selector and ports should not receive endpoints",
   529  			service: &v1.Service{
   530  				ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
   531  				Spec: v1.ServiceSpec{
   532  					Selector: map[string]string{"foo": "bar"},
   533  					Ports:    []v1.ServicePort{{Port: 80}},
   534  					Type:     v1.ServiceTypeExternalName,
   535  				},
   536  			},
   537  		},
   538  		{
   539  			desc: "External name with ports should not receive endpoints",
   540  			service: &v1.Service{
   541  				ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
   542  				Spec: v1.ServiceSpec{
   543  					Ports: []v1.ServicePort{{Port: 80}},
   544  					Type:  v1.ServiceTypeExternalName,
   545  				},
   546  			},
   547  		},
   548  		{
   549  			desc: "External name with selector should not receive endpoints",
   550  			service: &v1.Service{
   551  				ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
   552  				Spec: v1.ServiceSpec{
   553  					Selector: map[string]string{"foo": "bar"},
   554  					Type:     v1.ServiceTypeExternalName,
   555  				},
   556  			},
   557  		},
   558  		{
   559  			desc: "External name without selector and ports should not receive endpoints",
   560  			service: &v1.Service{
   561  				ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
   562  				Spec: v1.ServiceSpec{
   563  					Type: v1.ServiceTypeExternalName,
   564  				},
   565  			},
   566  		},
   567  	}
   568  
   569  	for _, tc := range testCases {
   570  		t.Run(tc.desc, func(t *testing.T) {
   571  			testServer, endpointsHandler := makeTestServer(t, namespace)
   572  
   573  			defer testServer.Close()
   574  			tCtx := ktesting.Init(t)
   575  			endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   576  			err := endpoints.serviceStore.Add(tc.service)
   577  			if err != nil {
   578  				t.Fatalf("Error adding service to service store: %v", err)
   579  			}
   580  			err = endpoints.syncService(tCtx, namespace+"/"+serviceName)
   581  			if err != nil {
   582  				t.Fatalf("Error syncing service: %v", err)
   583  			}
   584  			endpointsHandler.ValidateRequestCount(t, 0)
   585  		})
   586  	}
   587  }
   588  
   589  func TestSyncEndpointsProtocolUDP(t *testing.T) {
   590  	ns := "other"
   591  	testServer, endpointsHandler := makeTestServer(t, ns)
   592  	defer testServer.Close()
   593  	tCtx := ktesting.Init(t)
   594  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   595  	endpoints.endpointsStore.Add(&v1.Endpoints{
   596  		ObjectMeta: metav1.ObjectMeta{
   597  			Name:            "foo",
   598  			Namespace:       ns,
   599  			ResourceVersion: "1",
   600  		},
   601  		Subsets: []v1.EndpointSubset{{
   602  			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
   603  			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
   604  		}},
   605  	})
   606  	addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
   607  	endpoints.serviceStore.Add(&v1.Service{
   608  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   609  		Spec: v1.ServiceSpec{
   610  			Selector: map[string]string{},
   611  			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "UDP"}},
   612  		},
   613  	})
   614  	err := endpoints.syncService(tCtx, ns+"/foo")
   615  	if err != nil {
   616  		t.Errorf("Unexpected error syncing service %v", err)
   617  	}
   618  
   619  	endpointsHandler.ValidateRequestCount(t, 1)
   620  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
   621  		ObjectMeta: metav1.ObjectMeta{
   622  			Name:            "foo",
   623  			Namespace:       ns,
   624  			ResourceVersion: "1",
   625  			Labels: map[string]string{
   626  				v1.IsHeadlessService: "",
   627  			},
   628  		},
   629  		Subsets: []v1.EndpointSubset{{
   630  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
   631  			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "UDP"}},
   632  		}},
   633  	})
   634  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
   635  }
   636  
   637  func TestSyncEndpointsProtocolSCTP(t *testing.T) {
   638  	ns := "other"
   639  	testServer, endpointsHandler := makeTestServer(t, ns)
   640  	defer testServer.Close()
   641  	tCtx := ktesting.Init(t)
   642  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   643  	endpoints.endpointsStore.Add(&v1.Endpoints{
   644  		ObjectMeta: metav1.ObjectMeta{
   645  			Name:            "foo",
   646  			Namespace:       ns,
   647  			ResourceVersion: "1",
   648  		},
   649  		Subsets: []v1.EndpointSubset{{
   650  			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
   651  			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
   652  		}},
   653  	})
   654  	addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
   655  	endpoints.serviceStore.Add(&v1.Service{
   656  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   657  		Spec: v1.ServiceSpec{
   658  			Selector: map[string]string{},
   659  			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "SCTP"}},
   660  		},
   661  	})
   662  	err := endpoints.syncService(tCtx, ns+"/foo")
   663  	if err != nil {
   664  		t.Errorf("Unexpected error syncing service %v", err)
   665  	}
   666  
   667  	endpointsHandler.ValidateRequestCount(t, 1)
   668  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
   669  		ObjectMeta: metav1.ObjectMeta{
   670  			Name:            "foo",
   671  			Namespace:       ns,
   672  			ResourceVersion: "1",
   673  			Labels: map[string]string{
   674  				v1.IsHeadlessService: "",
   675  			},
   676  		},
   677  		Subsets: []v1.EndpointSubset{{
   678  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
   679  			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "SCTP"}},
   680  		}},
   681  	})
   682  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
   683  }
   684  
   685  func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
   686  	ns := "other"
   687  	testServer, endpointsHandler := makeTestServer(t, ns)
   688  	defer testServer.Close()
   689  	tCtx := ktesting.Init(t)
   690  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   691  	endpoints.endpointsStore.Add(&v1.Endpoints{
   692  		ObjectMeta: metav1.ObjectMeta{
   693  			Name:            "foo",
   694  			Namespace:       ns,
   695  			ResourceVersion: "1",
   696  		},
   697  		Subsets: []v1.EndpointSubset{},
   698  	})
   699  	addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
   700  	endpoints.serviceStore.Add(&v1.Service{
   701  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   702  		Spec: v1.ServiceSpec{
   703  			Selector: map[string]string{},
   704  			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
   705  		},
   706  	})
   707  	err := endpoints.syncService(tCtx, ns+"/foo")
   708  	if err != nil {
   709  		t.Errorf("Unexpected error syncing service %v", err)
   710  	}
   711  
   712  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
   713  		ObjectMeta: metav1.ObjectMeta{
   714  			Name:            "foo",
   715  			Namespace:       ns,
   716  			ResourceVersion: "1",
   717  			Labels: map[string]string{
   718  				v1.IsHeadlessService: "",
   719  			},
   720  		},
   721  		Subsets: []v1.EndpointSubset{{
   722  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
   723  			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
   724  		}},
   725  	})
   726  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
   727  }
   728  
   729  func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
   730  	ns := "other"
   731  	testServer, endpointsHandler := makeTestServer(t, ns)
   732  	defer testServer.Close()
   733  
   734  	tCtx := ktesting.Init(t)
   735  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   736  	endpoints.endpointsStore.Add(&v1.Endpoints{
   737  		ObjectMeta: metav1.ObjectMeta{
   738  			Name:            "foo",
   739  			Namespace:       ns,
   740  			ResourceVersion: "1",
   741  		},
   742  		Subsets: []v1.EndpointSubset{},
   743  	})
   744  	addPods(endpoints.podStore, ns, 0, 1, 1, ipv4only)
   745  	endpoints.serviceStore.Add(&v1.Service{
   746  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   747  		Spec: v1.ServiceSpec{
   748  			Selector: map[string]string{},
   749  			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
   750  		},
   751  	})
   752  	err := endpoints.syncService(tCtx, ns+"/foo")
   753  	if err != nil {
   754  		t.Errorf("Unexpected error syncing service %v", err)
   755  	}
   756  
   757  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
   758  		ObjectMeta: metav1.ObjectMeta{
   759  			Name:            "foo",
   760  			Namespace:       ns,
   761  			ResourceVersion: "1",
   762  			Labels: map[string]string{
   763  				v1.IsHeadlessService: "",
   764  			},
   765  		},
   766  		Subsets: []v1.EndpointSubset{{
   767  			NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
   768  			Ports:             []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
   769  		}},
   770  	})
   771  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
   772  }
   773  
   774  func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
   775  	ns := "other"
   776  	testServer, endpointsHandler := makeTestServer(t, ns)
   777  	defer testServer.Close()
   778  
   779  	tCtx := ktesting.Init(t)
   780  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   781  	endpoints.endpointsStore.Add(&v1.Endpoints{
   782  		ObjectMeta: metav1.ObjectMeta{
   783  			Name:            "foo",
   784  			Namespace:       ns,
   785  			ResourceVersion: "1",
   786  		},
   787  		Subsets: []v1.EndpointSubset{},
   788  	})
   789  	addPods(endpoints.podStore, ns, 1, 1, 1, ipv4only)
   790  	endpoints.serviceStore.Add(&v1.Service{
   791  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   792  		Spec: v1.ServiceSpec{
   793  			Selector: map[string]string{},
   794  			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
   795  		},
   796  	})
   797  	err := endpoints.syncService(tCtx, ns+"/foo")
   798  	if err != nil {
   799  		t.Errorf("Unexpected error syncing service %v", err)
   800  	}
   801  
   802  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
   803  		ObjectMeta: metav1.ObjectMeta{
   804  			Name:            "foo",
   805  			Namespace:       ns,
   806  			ResourceVersion: "1",
   807  			Labels: map[string]string{
   808  				v1.IsHeadlessService: "",
   809  			},
   810  		},
   811  		Subsets: []v1.EndpointSubset{{
   812  			Addresses:         []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
   813  			NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
   814  			Ports:             []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
   815  		}},
   816  	})
   817  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
   818  }
   819  
   820  func TestSyncEndpointsItemsPreexisting(t *testing.T) {
   821  	ns := "bar"
   822  	testServer, endpointsHandler := makeTestServer(t, ns)
   823  	defer testServer.Close()
   824  	tCtx := ktesting.Init(t)
   825  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   826  	endpoints.endpointsStore.Add(&v1.Endpoints{
   827  		ObjectMeta: metav1.ObjectMeta{
   828  			Name:            "foo",
   829  			Namespace:       ns,
   830  			ResourceVersion: "1",
   831  		},
   832  		Subsets: []v1.EndpointSubset{{
   833  			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
   834  			Ports:     []v1.EndpointPort{{Port: 1000}},
   835  		}},
   836  	})
   837  	addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
   838  	endpoints.serviceStore.Add(&v1.Service{
   839  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   840  		Spec: v1.ServiceSpec{
   841  			Selector: map[string]string{"foo": "bar"},
   842  			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
   843  		},
   844  	})
   845  	err := endpoints.syncService(tCtx, ns+"/foo")
   846  	if err != nil {
   847  		t.Errorf("Unexpected error syncing service %v", err)
   848  	}
   849  
   850  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
   851  		ObjectMeta: metav1.ObjectMeta{
   852  			Name:            "foo",
   853  			Namespace:       ns,
   854  			ResourceVersion: "1",
   855  			Labels: map[string]string{
   856  				v1.IsHeadlessService: "",
   857  			},
   858  		},
   859  		Subsets: []v1.EndpointSubset{{
   860  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
   861  			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
   862  		}},
   863  	})
   864  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
   865  }
   866  
   867  func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
   868  	ns := metav1.NamespaceDefault
   869  	testServer, endpointsHandler := makeTestServer(t, ns)
   870  	defer testServer.Close()
   871  	tCtx := ktesting.Init(t)
   872  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   873  	endpoints.endpointsStore.Add(&v1.Endpoints{
   874  		ObjectMeta: metav1.ObjectMeta{
   875  			ResourceVersion: "1",
   876  			Name:            "foo",
   877  			Namespace:       ns,
   878  		},
   879  		Subsets: []v1.EndpointSubset{{
   880  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
   881  			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
   882  		}},
   883  	})
   884  	addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, ipv4only)
   885  	endpoints.serviceStore.Add(&v1.Service{
   886  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
   887  		Spec: v1.ServiceSpec{
   888  			Selector: map[string]string{"foo": "bar"},
   889  			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
   890  		},
   891  	})
   892  	err := endpoints.syncService(tCtx, ns+"/foo")
   893  	if err != nil {
   894  		t.Errorf("Unexpected error syncing service %v", err)
   895  	}
   896  	endpointsHandler.ValidateRequestCount(t, 0)
   897  }
   898  
   899  func TestSyncEndpointsItems(t *testing.T) {
   900  	ns := "other"
   901  	testServer, endpointsHandler := makeTestServer(t, ns)
   902  	defer testServer.Close()
   903  	tCtx := ktesting.Init(t)
   904  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   905  	addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only)
   906  	addPods(endpoints.podStore, "blah", 5, 2, 0, ipv4only) // make sure these aren't found!
   907  
   908  	endpoints.serviceStore.Add(&v1.Service{
   909  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
   910  		Spec: v1.ServiceSpec{
   911  			Selector: map[string]string{"foo": "bar"},
   912  			Ports: []v1.ServicePort{
   913  				{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
   914  				{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt32(8088)},
   915  			},
   916  		},
   917  	})
   918  	err := endpoints.syncService(tCtx, "other/foo")
   919  	if err != nil {
   920  		t.Errorf("Unexpected error syncing service %v", err)
   921  	}
   922  
   923  	expectedSubsets := []v1.EndpointSubset{{
   924  		Addresses: []v1.EndpointAddress{
   925  			{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
   926  			{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
   927  			{IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
   928  		},
   929  		Ports: []v1.EndpointPort{
   930  			{Name: "port0", Port: 8080, Protocol: "TCP"},
   931  			{Name: "port1", Port: 8088, Protocol: "TCP"},
   932  		},
   933  	}}
   934  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
   935  		ObjectMeta: metav1.ObjectMeta{
   936  			ResourceVersion: "",
   937  			Name:            "foo",
   938  			Labels: map[string]string{
   939  				v1.IsHeadlessService: "",
   940  			},
   941  		},
   942  		Subsets: endptspkg.SortSubsets(expectedSubsets),
   943  	})
   944  	endpointsHandler.ValidateRequestCount(t, 1)
   945  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
   946  }
   947  
   948  func TestSyncEndpointsItemsWithLabels(t *testing.T) {
   949  	ns := "other"
   950  	testServer, endpointsHandler := makeTestServer(t, ns)
   951  	defer testServer.Close()
   952  	tCtx := ktesting.Init(t)
   953  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
   954  	addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only)
   955  	serviceLabels := map[string]string{"foo": "bar"}
   956  	endpoints.serviceStore.Add(&v1.Service{
   957  		ObjectMeta: metav1.ObjectMeta{
   958  			Name:      "foo",
   959  			Namespace: ns,
   960  			Labels:    serviceLabels,
   961  		},
   962  		Spec: v1.ServiceSpec{
   963  			Selector: map[string]string{"foo": "bar"},
   964  			Ports: []v1.ServicePort{
   965  				{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
   966  				{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt32(8088)},
   967  			},
   968  		},
   969  	})
   970  	err := endpoints.syncService(tCtx, ns+"/foo")
   971  	if err != nil {
   972  		t.Errorf("Unexpected error syncing service %v", err)
   973  	}
   974  
   975  	expectedSubsets := []v1.EndpointSubset{{
   976  		Addresses: []v1.EndpointAddress{
   977  			{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
   978  			{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
   979  			{IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
   980  		},
   981  		Ports: []v1.EndpointPort{
   982  			{Name: "port0", Port: 8080, Protocol: "TCP"},
   983  			{Name: "port1", Port: 8088, Protocol: "TCP"},
   984  		},
   985  	}}
   986  
   987  	serviceLabels[v1.IsHeadlessService] = ""
   988  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
   989  		ObjectMeta: metav1.ObjectMeta{
   990  			ResourceVersion: "",
   991  			Name:            "foo",
   992  			Labels:          serviceLabels,
   993  		},
   994  		Subsets: endptspkg.SortSubsets(expectedSubsets),
   995  	})
   996  	endpointsHandler.ValidateRequestCount(t, 1)
   997  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
   998  }
   999  
  1000  func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
  1001  	ns := "bar"
  1002  	testServer, endpointsHandler := makeTestServer(t, ns)
  1003  	defer testServer.Close()
  1004  	tCtx := ktesting.Init(t)
  1005  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
  1006  	endpoints.endpointsStore.Add(&v1.Endpoints{
  1007  		ObjectMeta: metav1.ObjectMeta{
  1008  			Name:            "foo",
  1009  			Namespace:       ns,
  1010  			ResourceVersion: "1",
  1011  			Labels: map[string]string{
  1012  				"foo": "bar",
  1013  			},
  1014  		},
  1015  		Subsets: []v1.EndpointSubset{{
  1016  			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  1017  			Ports:     []v1.EndpointPort{{Port: 1000}},
  1018  		}},
  1019  	})
  1020  	addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
  1021  	serviceLabels := map[string]string{"baz": "blah"}
  1022  	endpoints.serviceStore.Add(&v1.Service{
  1023  		ObjectMeta: metav1.ObjectMeta{
  1024  			Name:      "foo",
  1025  			Namespace: ns,
  1026  			Labels:    serviceLabels,
  1027  		},
  1028  		Spec: v1.ServiceSpec{
  1029  			Selector: map[string]string{"foo": "bar"},
  1030  			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
  1031  		},
  1032  	})
  1033  	err := endpoints.syncService(tCtx, ns+"/foo")
  1034  	if err != nil {
  1035  		t.Errorf("Unexpected error syncing service %v", err)
  1036  	}
  1037  
  1038  	serviceLabels[v1.IsHeadlessService] = ""
  1039  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1040  		ObjectMeta: metav1.ObjectMeta{
  1041  			Name:            "foo",
  1042  			Namespace:       ns,
  1043  			ResourceVersion: "1",
  1044  			Labels:          serviceLabels,
  1045  		},
  1046  		Subsets: []v1.EndpointSubset{{
  1047  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1048  			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  1049  		}},
  1050  	})
  1051  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  1052  }
  1053  
  1054  func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
  1055  	var tests = []struct {
  1056  		podsSynced            func() bool
  1057  		servicesSynced        func() bool
  1058  		endpointsSynced       func() bool
  1059  		shouldUpdateEndpoints bool
  1060  	}{
  1061  		{neverReady, alwaysReady, alwaysReady, false},
  1062  		{alwaysReady, neverReady, alwaysReady, false},
  1063  		{alwaysReady, alwaysReady, neverReady, false},
  1064  		{alwaysReady, alwaysReady, alwaysReady, true},
  1065  	}
  1066  
  1067  	for _, test := range tests {
  1068  		func() {
  1069  			ns := "other"
  1070  			testServer, endpointsHandler := makeTestServer(t, ns)
  1071  			defer testServer.Close()
  1072  			tCtx := ktesting.Init(t)
  1073  			endpoints := newController(tCtx, testServer.URL, 0*time.Second)
  1074  			addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
  1075  
  1076  			service := &v1.Service{
  1077  				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1078  				Spec: v1.ServiceSpec{
  1079  					Selector: map[string]string{},
  1080  					Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
  1081  				},
  1082  			}
  1083  			endpoints.serviceStore.Add(service)
  1084  			endpoints.onServiceUpdate(service)
  1085  			endpoints.podsSynced = test.podsSynced
  1086  			endpoints.servicesSynced = test.servicesSynced
  1087  			endpoints.endpointsSynced = test.endpointsSynced
  1088  			endpoints.workerLoopPeriod = 10 * time.Millisecond
  1089  			go endpoints.Run(tCtx, 1)
  1090  
  1091  			// cache.WaitForNamedCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period.
  1092  			// To ensure we get all updates, including unexpected ones, we need to wait at least as long as
  1093  			// a single cache sync period and worker period, with some fudge room.
  1094  			time.Sleep(150 * time.Millisecond)
  1095  			if test.shouldUpdateEndpoints {
  1096  				// Ensure the work queue has been processed by looping for up to a second to prevent flakes.
  1097  				wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
  1098  					return endpoints.queue.Len() == 0, nil
  1099  				})
  1100  				endpointsHandler.ValidateRequestCount(t, 1)
  1101  			} else {
  1102  				endpointsHandler.ValidateRequestCount(t, 0)
  1103  			}
  1104  		}()
  1105  	}
  1106  }
  1107  
  1108  func TestSyncEndpointsHeadlessService(t *testing.T) {
  1109  	ns := "headless"
  1110  	testServer, endpointsHandler := makeTestServer(t, ns)
  1111  	defer testServer.Close()
  1112  	tCtx := ktesting.Init(t)
  1113  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
  1114  	endpoints.endpointsStore.Add(&v1.Endpoints{
  1115  		ObjectMeta: metav1.ObjectMeta{
  1116  			Name:            "foo",
  1117  			Namespace:       ns,
  1118  			ResourceVersion: "1",
  1119  		},
  1120  		Subsets: []v1.EndpointSubset{{
  1121  			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  1122  			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  1123  		}},
  1124  	})
  1125  	addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
  1126  	service := &v1.Service{
  1127  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, Labels: map[string]string{"a": "b"}},
  1128  		Spec: v1.ServiceSpec{
  1129  			Selector:  map[string]string{},
  1130  			ClusterIP: api.ClusterIPNone,
  1131  			Ports:     []v1.ServicePort{},
  1132  		},
  1133  	}
  1134  	originalService := service.DeepCopy()
  1135  	endpoints.serviceStore.Add(service)
  1136  	err := endpoints.syncService(tCtx, ns+"/foo")
  1137  	if err != nil {
  1138  		t.Errorf("Unexpected error syncing service %v", err)
  1139  	}
  1140  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1141  		ObjectMeta: metav1.ObjectMeta{
  1142  			Name:            "foo",
  1143  			Namespace:       ns,
  1144  			ResourceVersion: "1",
  1145  			Labels: map[string]string{
  1146  				"a":                  "b",
  1147  				v1.IsHeadlessService: "",
  1148  			},
  1149  		},
  1150  		Subsets: []v1.EndpointSubset{{
  1151  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1152  			Ports:     []v1.EndpointPort{},
  1153  		}},
  1154  	})
  1155  	if !reflect.DeepEqual(originalService, service) {
  1156  		t.Fatalf("syncing endpoints changed service: %s", cmp.Diff(service, originalService))
  1157  	}
  1158  	endpointsHandler.ValidateRequestCount(t, 1)
  1159  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  1160  }
  1161  
  1162  func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) {
  1163  	ns := "other"
  1164  	testServer, endpointsHandler := makeTestServer(t, ns)
  1165  	defer testServer.Close()
  1166  
  1167  	tCtx := ktesting.Init(t)
  1168  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
  1169  	endpoints.endpointsStore.Add(&v1.Endpoints{
  1170  		ObjectMeta: metav1.ObjectMeta{
  1171  			Name:            "foo",
  1172  			Namespace:       ns,
  1173  			ResourceVersion: "1",
  1174  			Labels: map[string]string{
  1175  				"foo": "bar",
  1176  			},
  1177  		},
  1178  		Subsets: []v1.EndpointSubset{},
  1179  	})
  1180  	addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed)
  1181  	endpoints.serviceStore.Add(&v1.Service{
  1182  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1183  		Spec: v1.ServiceSpec{
  1184  			Selector: map[string]string{"foo": "bar"},
  1185  			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
  1186  		},
  1187  	})
  1188  	err := endpoints.syncService(tCtx, ns+"/foo")
  1189  	if err != nil {
  1190  		t.Errorf("Unexpected error syncing service %v", err)
  1191  	}
  1192  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1193  		ObjectMeta: metav1.ObjectMeta{
  1194  			Name:            "foo",
  1195  			Namespace:       ns,
  1196  			ResourceVersion: "1",
  1197  			Labels: map[string]string{
  1198  				v1.IsHeadlessService: "",
  1199  			},
  1200  		},
  1201  		Subsets: []v1.EndpointSubset{},
  1202  	})
  1203  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  1204  }
  1205  
  1206  func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) {
  1207  	ns := "other"
  1208  	testServer, endpointsHandler := makeTestServer(t, ns)
  1209  	defer testServer.Close()
  1210  
  1211  	tCtx := ktesting.Init(t)
  1212  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
  1213  	endpoints.endpointsStore.Add(&v1.Endpoints{
  1214  		ObjectMeta: metav1.ObjectMeta{
  1215  			Name:            "foo",
  1216  			Namespace:       ns,
  1217  			ResourceVersion: "1",
  1218  			Labels: map[string]string{
  1219  				"foo": "bar",
  1220  			},
  1221  		},
  1222  		Subsets: []v1.EndpointSubset{},
  1223  	})
  1224  	addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded)
  1225  	endpoints.serviceStore.Add(&v1.Service{
  1226  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1227  		Spec: v1.ServiceSpec{
  1228  			Selector: map[string]string{"foo": "bar"},
  1229  			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
  1230  		},
  1231  	})
  1232  	err := endpoints.syncService(tCtx, ns+"/foo")
  1233  	if err != nil {
  1234  		t.Errorf("Unexpected error syncing service %v", err)
  1235  	}
  1236  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1237  		ObjectMeta: metav1.ObjectMeta{
  1238  			Name:            "foo",
  1239  			Namespace:       ns,
  1240  			ResourceVersion: "1",
  1241  			Labels: map[string]string{
  1242  				v1.IsHeadlessService: "",
  1243  			},
  1244  		},
  1245  		Subsets: []v1.EndpointSubset{},
  1246  	})
  1247  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  1248  }
  1249  
  1250  func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) {
  1251  	ns := "other"
  1252  	testServer, endpointsHandler := makeTestServer(t, ns)
  1253  	defer testServer.Close()
  1254  
  1255  	tCtx := ktesting.Init(t)
  1256  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
  1257  	endpoints.endpointsStore.Add(&v1.Endpoints{
  1258  		ObjectMeta: metav1.ObjectMeta{
  1259  			Name:            "foo",
  1260  			Namespace:       ns,
  1261  			ResourceVersion: "1",
  1262  			Labels: map[string]string{
  1263  				"foo": "bar",
  1264  			},
  1265  		},
  1266  		Subsets: []v1.EndpointSubset{},
  1267  	})
  1268  	addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded)
  1269  	endpoints.serviceStore.Add(&v1.Service{
  1270  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1271  		Spec: v1.ServiceSpec{
  1272  			Selector: map[string]string{"foo": "bar"},
  1273  			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
  1274  		},
  1275  	})
  1276  	err := endpoints.syncService(tCtx, ns+"/foo")
  1277  	if err != nil {
  1278  		t.Errorf("Unexpected error syncing service %v", err)
  1279  	}
  1280  
  1281  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1282  		ObjectMeta: metav1.ObjectMeta{
  1283  			Name:            "foo",
  1284  			Namespace:       ns,
  1285  			ResourceVersion: "1",
  1286  			Labels: map[string]string{
  1287  				v1.IsHeadlessService: "",
  1288  			},
  1289  		},
  1290  		Subsets: []v1.EndpointSubset{},
  1291  	})
  1292  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  1293  }
  1294  
  1295  func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
  1296  	ns := metav1.NamespaceDefault
  1297  	testServer, endpointsHandler := makeTestServer(t, ns)
  1298  	defer testServer.Close()
  1299  	tCtx := ktesting.Init(t)
  1300  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
  1301  	endpoints.serviceStore.Add(&v1.Service{
  1302  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1303  		Spec: v1.ServiceSpec{
  1304  			Selector:  map[string]string{"foo": "bar"},
  1305  			ClusterIP: "None",
  1306  			Ports:     nil,
  1307  		},
  1308  	})
  1309  	addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
  1310  
  1311  	err := endpoints.syncService(tCtx, ns+"/foo")
  1312  	if err != nil {
  1313  		t.Errorf("Unexpected error syncing service %v", err)
  1314  	}
  1315  	endpointsHandler.ValidateRequestCount(t, 1)
  1316  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1317  		ObjectMeta: metav1.ObjectMeta{
  1318  			Name: "foo",
  1319  			Labels: map[string]string{
  1320  				v1.IsHeadlessService: "",
  1321  			},
  1322  		},
  1323  		Subsets: []v1.EndpointSubset{{
  1324  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1325  			Ports:     nil,
  1326  		}},
  1327  	})
  1328  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
  1329  }
  1330  
  1331  func TestPodToEndpointAddressForService(t *testing.T) {
  1332  	ipv4 := v1.IPv4Protocol
  1333  	ipv6 := v1.IPv6Protocol
  1334  
  1335  	testCases := []struct {
  1336  		name                   string
  1337  		ipFamilies             []v1.IPFamily
  1338  		service                v1.Service
  1339  		expectedEndpointFamily v1.IPFamily
  1340  		expectError            bool
  1341  	}{
  1342  		{
  1343  			name:       "v4 service, in a single stack cluster",
  1344  			ipFamilies: ipv4only,
  1345  			service: v1.Service{
  1346  				Spec: v1.ServiceSpec{
  1347  					ClusterIP: "10.0.0.1",
  1348  				},
  1349  			},
  1350  			expectedEndpointFamily: ipv4,
  1351  		},
  1352  		{
  1353  			name:       "v4 service, in a dual stack cluster",
  1354  			ipFamilies: ipv4ipv6,
  1355  			service: v1.Service{
  1356  				Spec: v1.ServiceSpec{
  1357  					ClusterIP: "10.0.0.1",
  1358  				},
  1359  			},
  1360  			expectedEndpointFamily: ipv4,
  1361  		},
  1362  		{
  1363  			name:       "v4 service, in a dual stack ipv6-primary cluster",
  1364  			ipFamilies: ipv6ipv4,
  1365  			service: v1.Service{
  1366  				Spec: v1.ServiceSpec{
  1367  					ClusterIP: "10.0.0.1",
  1368  				},
  1369  			},
  1370  			expectedEndpointFamily: ipv4,
  1371  		},
  1372  		{
  1373  			name:       "v4 headless service, in a single stack cluster",
  1374  			ipFamilies: ipv4only,
  1375  			service: v1.Service{
  1376  				Spec: v1.ServiceSpec{
  1377  					ClusterIP: v1.ClusterIPNone,
  1378  				},
  1379  			},
  1380  			expectedEndpointFamily: ipv4,
  1381  		},
  1382  		{
  1383  			name:       "v4 headless service, in a dual stack cluster",
  1384  			ipFamilies: ipv4ipv6,
  1385  			service: v1.Service{
  1386  				Spec: v1.ServiceSpec{
  1387  					ClusterIP:  v1.ClusterIPNone,
  1388  					IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
  1389  				},
  1390  			},
  1391  			expectedEndpointFamily: ipv4,
  1392  		},
  1393  		{
  1394  			name:       "v4 legacy headless service, in a dual stack cluster",
  1395  			ipFamilies: ipv4ipv6,
  1396  			service: v1.Service{
  1397  				Spec: v1.ServiceSpec{
  1398  					ClusterIP: v1.ClusterIPNone,
  1399  				},
  1400  			},
  1401  			expectedEndpointFamily: ipv4,
  1402  		},
  1403  		{
  1404  			name:       "v4 legacy headless service, in a dual stack ipv6-primary cluster",
  1405  			ipFamilies: ipv6ipv4,
  1406  			service: v1.Service{
  1407  				Spec: v1.ServiceSpec{
  1408  					ClusterIP: v1.ClusterIPNone,
  1409  				},
  1410  			},
  1411  			expectedEndpointFamily: ipv6,
  1412  		},
  1413  		{
  1414  			name:       "v6 service, in a dual stack cluster",
  1415  			ipFamilies: ipv4ipv6,
  1416  			service: v1.Service{
  1417  				Spec: v1.ServiceSpec{
  1418  					ClusterIP: "3000::1",
  1419  				},
  1420  			},
  1421  			expectedEndpointFamily: ipv6,
  1422  		},
  1423  		{
  1424  			name:       "v6 headless service, in a single stack cluster",
  1425  			ipFamilies: ipv6only,
  1426  			service: v1.Service{
  1427  				Spec: v1.ServiceSpec{
  1428  					ClusterIP: v1.ClusterIPNone,
  1429  				},
  1430  			},
  1431  			expectedEndpointFamily: ipv6,
  1432  		},
  1433  		{
  1434  			name:       "v6 headless service, in a dual stack cluster (connected to a new api-server)",
  1435  			ipFamilies: ipv4ipv6,
  1436  			service: v1.Service{
  1437  				Spec: v1.ServiceSpec{
  1438  					ClusterIP:  v1.ClusterIPNone,
  1439  					IPFamilies: []v1.IPFamily{v1.IPv6Protocol}, // <- set by a api-server defaulting logic
  1440  				},
  1441  			},
  1442  			expectedEndpointFamily: ipv6,
  1443  		},
  1444  		{
  1445  			name:       "v6 legacy headless service, in a dual stack cluster  (connected to a old api-server)",
  1446  			ipFamilies: ipv4ipv6,
  1447  			service: v1.Service{
  1448  				Spec: v1.ServiceSpec{
  1449  					ClusterIP: v1.ClusterIPNone, // <- families are not set by api-server
  1450  				},
  1451  			},
  1452  			expectedEndpointFamily: ipv4,
  1453  		},
  1454  		// in reality this is a misconfigured cluster
  1455  		// i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6
  1456  		// previously controller could assign wrong ip to endpoint address
  1457  		// with gate removed. this is no longer the case. this is *not* behavior change
  1458  		// because previously things would have failed in kube-proxy anyway (due to editing wrong iptables).
  1459  		{
  1460  			name:       "v6 service, in a v4 only cluster.",
  1461  			ipFamilies: ipv4only,
  1462  			service: v1.Service{
  1463  				Spec: v1.ServiceSpec{
  1464  					ClusterIP: "3000::1",
  1465  				},
  1466  			},
  1467  			expectError:            true,
  1468  			expectedEndpointFamily: ipv4,
  1469  		},
  1470  		// but this will actually give an error
  1471  		{
  1472  			name:       "v6 service, in a v4 only cluster",
  1473  			ipFamilies: ipv4only,
  1474  			service: v1.Service{
  1475  				Spec: v1.ServiceSpec{
  1476  					ClusterIP: "3000::1",
  1477  				},
  1478  			},
  1479  			expectError: true,
  1480  		},
  1481  	}
  1482  	for _, tc := range testCases {
  1483  		t.Run(tc.name, func(t *testing.T) {
  1484  			podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
  1485  			ns := "test"
  1486  			addPods(podStore, ns, 1, 1, 0, tc.ipFamilies)
  1487  			pods := podStore.List()
  1488  			if len(pods) != 1 {
  1489  				t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods))
  1490  			}
  1491  			pod := pods[0].(*v1.Pod)
  1492  			epa, err := podToEndpointAddressForService(&tc.service, pod)
  1493  
  1494  			if err != nil && !tc.expectError {
  1495  				t.Fatalf("podToEndpointAddressForService returned unexpected error %v", err)
  1496  			}
  1497  
  1498  			if err == nil && tc.expectError {
  1499  				t.Fatalf("podToEndpointAddressForService should have returned error but it did not")
  1500  			}
  1501  
  1502  			if err != nil && tc.expectError {
  1503  				return
  1504  			}
  1505  
  1506  			if utilnet.IsIPv6String(epa.IP) != (tc.expectedEndpointFamily == ipv6) {
  1507  				t.Fatalf("IP: expected %s, got: %s", tc.expectedEndpointFamily, epa.IP)
  1508  			}
  1509  			if *(epa.NodeName) != pod.Spec.NodeName {
  1510  				t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
  1511  			}
  1512  			if epa.TargetRef.Kind != "Pod" {
  1513  				t.Fatalf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
  1514  			}
  1515  			if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
  1516  				t.Fatalf("TargetRef.Namespace: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
  1517  			}
  1518  			if epa.TargetRef.Name != pod.ObjectMeta.Name {
  1519  				t.Fatalf("TargetRef.Name: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
  1520  			}
  1521  			if epa.TargetRef.UID != pod.ObjectMeta.UID {
  1522  				t.Fatalf("TargetRef.UID: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
  1523  			}
  1524  			if epa.TargetRef.ResourceVersion != "" {
  1525  				t.Fatalf("TargetRef.ResourceVersion: expected empty, got: %s", epa.TargetRef.ResourceVersion)
  1526  			}
  1527  		})
  1528  	}
  1529  
  1530  }
  1531  
  1532  func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
  1533  	ns := "other"
  1534  	testServer, endpointsHandler := makeTestServer(t, ns)
  1535  	defer testServer.Close()
  1536  
  1537  	tCtx := ktesting.Init(t)
  1538  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
  1539  	endpoints.endpointsStore.Add(&v1.Endpoints{
  1540  		ObjectMeta: metav1.ObjectMeta{
  1541  			Name:            "foo",
  1542  			Namespace:       ns,
  1543  			ResourceVersion: "1",
  1544  		},
  1545  		Subsets: []v1.EndpointSubset{{
  1546  			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  1547  			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  1548  		}},
  1549  	})
  1550  	addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
  1551  	endpoints.serviceStore.Add(&v1.Service{
  1552  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
  1553  		Spec: v1.ServiceSpec{
  1554  			Selector: map[string]string{},
  1555  			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
  1556  		},
  1557  	})
  1558  	err := endpoints.syncService(tCtx, ns+"/foo")
  1559  	if err != nil {
  1560  		t.Errorf("Unexpected error syncing service %v", err)
  1561  	}
  1562  
  1563  	endpointsHandler.ValidateRequestCount(t, 1)
  1564  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1565  		ObjectMeta: metav1.ObjectMeta{
  1566  			Name:            "foo",
  1567  			Namespace:       ns,
  1568  			ResourceVersion: "1",
  1569  			Annotations: map[string]string{
  1570  				v1.EndpointsLastChangeTriggerTime: triggerTimeString,
  1571  			},
  1572  			Labels: map[string]string{
  1573  				v1.IsHeadlessService: "",
  1574  			},
  1575  		},
  1576  		Subsets: []v1.EndpointSubset{{
  1577  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1578  			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  1579  		}},
  1580  	})
  1581  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  1582  }
  1583  
  1584  func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
  1585  	ns := "other"
  1586  	testServer, endpointsHandler := makeTestServer(t, ns)
  1587  	defer testServer.Close()
  1588  
  1589  	tCtx := ktesting.Init(t)
  1590  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
  1591  	endpoints.endpointsStore.Add(&v1.Endpoints{
  1592  		ObjectMeta: metav1.ObjectMeta{
  1593  			Name:            "foo",
  1594  			Namespace:       ns,
  1595  			ResourceVersion: "1",
  1596  			Annotations: map[string]string{
  1597  				v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
  1598  			},
  1599  		},
  1600  		Subsets: []v1.EndpointSubset{{
  1601  			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  1602  			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  1603  		}},
  1604  	})
  1605  	addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
  1606  	endpoints.serviceStore.Add(&v1.Service{
  1607  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
  1608  		Spec: v1.ServiceSpec{
  1609  			Selector: map[string]string{},
  1610  			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
  1611  		},
  1612  	})
  1613  	err := endpoints.syncService(tCtx, ns+"/foo")
  1614  	if err != nil {
  1615  		t.Errorf("Unexpected error syncing service %v", err)
  1616  	}
  1617  
  1618  	endpointsHandler.ValidateRequestCount(t, 1)
  1619  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1620  		ObjectMeta: metav1.ObjectMeta{
  1621  			Name:            "foo",
  1622  			Namespace:       ns,
  1623  			ResourceVersion: "1",
  1624  			Annotations: map[string]string{
  1625  				v1.EndpointsLastChangeTriggerTime: triggerTimeString,
  1626  			},
  1627  			Labels: map[string]string{
  1628  				v1.IsHeadlessService: "",
  1629  			},
  1630  		},
  1631  		Subsets: []v1.EndpointSubset{{
  1632  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1633  			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  1634  		}},
  1635  	})
  1636  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  1637  }
  1638  
  1639  func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
  1640  	ns := "other"
  1641  	testServer, endpointsHandler := makeTestServer(t, ns)
  1642  	defer testServer.Close()
  1643  
  1644  	tCtx := ktesting.Init(t)
  1645  	endpoints := newController(tCtx, testServer.URL, 0*time.Second)
  1646  	endpoints.endpointsStore.Add(&v1.Endpoints{
  1647  		ObjectMeta: metav1.ObjectMeta{
  1648  			Name:            "foo",
  1649  			Namespace:       ns,
  1650  			ResourceVersion: "1",
  1651  			Annotations: map[string]string{
  1652  				v1.EndpointsLastChangeTriggerTime: triggerTimeString,
  1653  			},
  1654  		},
  1655  		Subsets: []v1.EndpointSubset{{
  1656  			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  1657  			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  1658  		}},
  1659  	})
  1660  	// Neither pod nor service has trigger time, this should cause annotation to be cleared.
  1661  	addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
  1662  	endpoints.serviceStore.Add(&v1.Service{
  1663  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1664  		Spec: v1.ServiceSpec{
  1665  			Selector: map[string]string{},
  1666  			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
  1667  		},
  1668  	})
  1669  	err := endpoints.syncService(tCtx, ns+"/foo")
  1670  	if err != nil {
  1671  		t.Errorf("Unexpected error syncing service %v", err)
  1672  	}
  1673  
  1674  	endpointsHandler.ValidateRequestCount(t, 1)
  1675  	data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1676  		ObjectMeta: metav1.ObjectMeta{
  1677  			Name:            "foo",
  1678  			Namespace:       ns,
  1679  			ResourceVersion: "1",
  1680  			Labels: map[string]string{
  1681  				v1.IsHeadlessService: "",
  1682  			}, // Annotation not set anymore.
  1683  		},
  1684  		Subsets: []v1.EndpointSubset{{
  1685  			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1686  			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  1687  		}},
  1688  	})
  1689  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  1690  }
  1691  
  1692  // TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together.
  1693  // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
  1694  // TODO(mborsz): Migrate this test to mock clock when possible.
  1695  func TestPodUpdatesBatching(t *testing.T) {
  1696  	type podUpdate struct {
  1697  		delay   time.Duration
  1698  		podName string
  1699  		podIP   string
  1700  	}
  1701  
  1702  	tests := []struct {
  1703  		name             string
  1704  		batchPeriod      time.Duration
  1705  		podsCount        int
  1706  		updates          []podUpdate
  1707  		finalDelay       time.Duration
  1708  		wantRequestCount int
  1709  	}{
  1710  		{
  1711  			name:        "three updates with no batching",
  1712  			batchPeriod: 0 * time.Second,
  1713  			podsCount:   10,
  1714  			updates: []podUpdate{
  1715  				{
  1716  					// endpoints.Run needs ~100 ms to start processing updates.
  1717  					delay:   200 * time.Millisecond,
  1718  					podName: "pod0",
  1719  					podIP:   "10.0.0.0",
  1720  				},
  1721  				{
  1722  					delay:   100 * time.Millisecond,
  1723  					podName: "pod1",
  1724  					podIP:   "10.0.0.1",
  1725  				},
  1726  				{
  1727  					delay:   100 * time.Millisecond,
  1728  					podName: "pod2",
  1729  					podIP:   "10.0.0.2",
  1730  				},
  1731  			},
  1732  			finalDelay:       3 * time.Second,
  1733  			wantRequestCount: 3,
  1734  		},
  1735  		{
  1736  			name:        "three updates in one batch",
  1737  			batchPeriod: 1 * time.Second,
  1738  			podsCount:   10,
  1739  			updates: []podUpdate{
  1740  				{
  1741  					// endpoints.Run needs ~100 ms to start processing updates.
  1742  					delay:   200 * time.Millisecond,
  1743  					podName: "pod0",
  1744  					podIP:   "10.0.0.0",
  1745  				},
  1746  				{
  1747  					delay:   100 * time.Millisecond,
  1748  					podName: "pod1",
  1749  					podIP:   "10.0.0.1",
  1750  				},
  1751  				{
  1752  					delay:   100 * time.Millisecond,
  1753  					podName: "pod2",
  1754  					podIP:   "10.0.0.2",
  1755  				},
  1756  			},
  1757  			finalDelay:       3 * time.Second,
  1758  			wantRequestCount: 1,
  1759  		},
  1760  		{
  1761  			name:        "three updates in two batches",
  1762  			batchPeriod: 1 * time.Second,
  1763  			podsCount:   10,
  1764  			updates: []podUpdate{
  1765  				{
  1766  					// endpoints.Run needs ~100 ms to start processing updates.
  1767  					delay:   200 * time.Millisecond,
  1768  					podName: "pod0",
  1769  					podIP:   "10.0.0.0",
  1770  				},
  1771  				{
  1772  					delay:   100 * time.Millisecond,
  1773  					podName: "pod1",
  1774  					podIP:   "10.0.0.1",
  1775  				},
  1776  				{
  1777  					delay:   1 * time.Second,
  1778  					podName: "pod2",
  1779  					podIP:   "10.0.0.2",
  1780  				},
  1781  			},
  1782  			finalDelay:       3 * time.Second,
  1783  			wantRequestCount: 2,
  1784  		},
  1785  	}
  1786  
  1787  	for _, tc := range tests {
  1788  		t.Run(tc.name, func(t *testing.T) {
  1789  			ns := "other"
  1790  			resourceVersion := 1
  1791  			testServer, endpointsHandler := makeTestServer(t, ns)
  1792  			defer testServer.Close()
  1793  
  1794  			tCtx := ktesting.Init(t)
  1795  			endpoints := newController(tCtx, testServer.URL, tc.batchPeriod)
  1796  			endpoints.podsSynced = alwaysReady
  1797  			endpoints.servicesSynced = alwaysReady
  1798  			endpoints.endpointsSynced = alwaysReady
  1799  			endpoints.workerLoopPeriod = 10 * time.Millisecond
  1800  
  1801  			go endpoints.Run(tCtx, 1)
  1802  
  1803  			addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only)
  1804  
  1805  			endpoints.serviceStore.Add(&v1.Service{
  1806  				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1807  				Spec: v1.ServiceSpec{
  1808  					Selector: map[string]string{"foo": "bar"},
  1809  					Ports:    []v1.ServicePort{{Port: 80}},
  1810  				},
  1811  			})
  1812  
  1813  			for _, update := range tc.updates {
  1814  				time.Sleep(update.delay)
  1815  
  1816  				old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
  1817  				if err != nil {
  1818  					t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
  1819  				}
  1820  				if !exists {
  1821  					t.Fatalf("Pod %q doesn't exist", update.podName)
  1822  				}
  1823  				oldPod := old.(*v1.Pod)
  1824  				newPod := oldPod.DeepCopy()
  1825  				newPod.Status.PodIP = update.podIP
  1826  				newPod.Status.PodIPs[0].IP = update.podIP
  1827  				newPod.ResourceVersion = strconv.Itoa(resourceVersion)
  1828  				resourceVersion++
  1829  
  1830  				endpoints.podStore.Update(newPod)
  1831  				endpoints.updatePod(oldPod, newPod)
  1832  			}
  1833  
  1834  			time.Sleep(tc.finalDelay)
  1835  			endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
  1836  		})
  1837  	}
  1838  }
  1839  
  1840  // TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together.
  1841  // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
  1842  // TODO(mborsz): Migrate this test to mock clock when possible.
  1843  func TestPodAddsBatching(t *testing.T) {
  1844  	type podAdd struct {
  1845  		delay time.Duration
  1846  	}
  1847  
  1848  	tests := []struct {
  1849  		name             string
  1850  		batchPeriod      time.Duration
  1851  		adds             []podAdd
  1852  		finalDelay       time.Duration
  1853  		wantRequestCount int
  1854  	}{
  1855  		{
  1856  			name:        "three adds with no batching",
  1857  			batchPeriod: 0 * time.Second,
  1858  			adds: []podAdd{
  1859  				{
  1860  					// endpoints.Run needs ~100 ms to start processing updates.
  1861  					delay: 200 * time.Millisecond,
  1862  				},
  1863  				{
  1864  					delay: 100 * time.Millisecond,
  1865  				},
  1866  				{
  1867  					delay: 100 * time.Millisecond,
  1868  				},
  1869  			},
  1870  			finalDelay:       3 * time.Second,
  1871  			wantRequestCount: 3,
  1872  		},
  1873  		{
  1874  			name:        "three adds in one batch",
  1875  			batchPeriod: 1 * time.Second,
  1876  			adds: []podAdd{
  1877  				{
  1878  					// endpoints.Run needs ~100 ms to start processing updates.
  1879  					delay: 200 * time.Millisecond,
  1880  				},
  1881  				{
  1882  					delay: 100 * time.Millisecond,
  1883  				},
  1884  				{
  1885  					delay: 100 * time.Millisecond,
  1886  				},
  1887  			},
  1888  			finalDelay:       3 * time.Second,
  1889  			wantRequestCount: 1,
  1890  		},
  1891  		{
  1892  			name:        "three adds in two batches",
  1893  			batchPeriod: 1 * time.Second,
  1894  			adds: []podAdd{
  1895  				{
  1896  					// endpoints.Run needs ~100 ms to start processing updates.
  1897  					delay: 200 * time.Millisecond,
  1898  				},
  1899  				{
  1900  					delay: 100 * time.Millisecond,
  1901  				},
  1902  				{
  1903  					delay: 1 * time.Second,
  1904  				},
  1905  			},
  1906  			finalDelay:       3 * time.Second,
  1907  			wantRequestCount: 2,
  1908  		},
  1909  	}
  1910  
  1911  	for _, tc := range tests {
  1912  		t.Run(tc.name, func(t *testing.T) {
  1913  			ns := "other"
  1914  			testServer, endpointsHandler := makeTestServer(t, ns)
  1915  			defer testServer.Close()
  1916  
  1917  			tCtx := ktesting.Init(t)
  1918  			endpoints := newController(tCtx, testServer.URL, tc.batchPeriod)
  1919  			endpoints.podsSynced = alwaysReady
  1920  			endpoints.servicesSynced = alwaysReady
  1921  			endpoints.endpointsSynced = alwaysReady
  1922  			endpoints.workerLoopPeriod = 10 * time.Millisecond
  1923  
  1924  			go endpoints.Run(tCtx, 1)
  1925  
  1926  			endpoints.serviceStore.Add(&v1.Service{
  1927  				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1928  				Spec: v1.ServiceSpec{
  1929  					Selector: map[string]string{"foo": "bar"},
  1930  					Ports:    []v1.ServicePort{{Port: 80}},
  1931  				},
  1932  			})
  1933  
  1934  			for i, add := range tc.adds {
  1935  				time.Sleep(add.delay)
  1936  
  1937  				p := testPod(ns, i, 1, true, ipv4only)
  1938  				endpoints.podStore.Add(p)
  1939  				endpoints.addPod(p)
  1940  			}
  1941  
  1942  			time.Sleep(tc.finalDelay)
  1943  			endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
  1944  		})
  1945  	}
  1946  }
  1947  
  1948  // TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together.
  1949  // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
  1950  // TODO(mborsz): Migrate this test to mock clock when possible.
  1951  func TestPodDeleteBatching(t *testing.T) {
  1952  	type podDelete struct {
  1953  		delay   time.Duration
  1954  		podName string
  1955  	}
  1956  
  1957  	tests := []struct {
  1958  		name             string
  1959  		batchPeriod      time.Duration
  1960  		podsCount        int
  1961  		deletes          []podDelete
  1962  		finalDelay       time.Duration
  1963  		wantRequestCount int
  1964  	}{
  1965  		{
  1966  			name:        "three deletes with no batching",
  1967  			batchPeriod: 0 * time.Second,
  1968  			podsCount:   10,
  1969  			deletes: []podDelete{
  1970  				{
  1971  					// endpoints.Run needs ~100 ms to start processing updates.
  1972  					delay:   200 * time.Millisecond,
  1973  					podName: "pod0",
  1974  				},
  1975  				{
  1976  					delay:   100 * time.Millisecond,
  1977  					podName: "pod1",
  1978  				},
  1979  				{
  1980  					delay:   100 * time.Millisecond,
  1981  					podName: "pod2",
  1982  				},
  1983  			},
  1984  			finalDelay:       3 * time.Second,
  1985  			wantRequestCount: 3,
  1986  		},
  1987  		{
  1988  			name:        "three deletes in one batch",
  1989  			batchPeriod: 1 * time.Second,
  1990  			podsCount:   10,
  1991  			deletes: []podDelete{
  1992  				{
  1993  					// endpoints.Run needs ~100 ms to start processing updates.
  1994  					delay:   200 * time.Millisecond,
  1995  					podName: "pod0",
  1996  				},
  1997  				{
  1998  					delay:   100 * time.Millisecond,
  1999  					podName: "pod1",
  2000  				},
  2001  				{
  2002  					delay:   100 * time.Millisecond,
  2003  					podName: "pod2",
  2004  				},
  2005  			},
  2006  			finalDelay:       3 * time.Second,
  2007  			wantRequestCount: 1,
  2008  		},
  2009  		{
  2010  			name:        "three deletes in two batches",
  2011  			batchPeriod: 1 * time.Second,
  2012  			podsCount:   10,
  2013  			deletes: []podDelete{
  2014  				{
  2015  					// endpoints.Run needs ~100 ms to start processing updates.
  2016  					delay:   200 * time.Millisecond,
  2017  					podName: "pod0",
  2018  				},
  2019  				{
  2020  					delay:   100 * time.Millisecond,
  2021  					podName: "pod1",
  2022  				},
  2023  				{
  2024  					delay:   1 * time.Second,
  2025  					podName: "pod2",
  2026  				},
  2027  			},
  2028  			finalDelay:       3 * time.Second,
  2029  			wantRequestCount: 2,
  2030  		},
  2031  	}
  2032  
  2033  	for _, tc := range tests {
  2034  		t.Run(tc.name, func(t *testing.T) {
  2035  			ns := "other"
  2036  			testServer, endpointsHandler := makeTestServer(t, ns)
  2037  			defer testServer.Close()
  2038  
  2039  			tCtx := ktesting.Init(t)
  2040  			endpoints := newController(tCtx, testServer.URL, tc.batchPeriod)
  2041  			endpoints.podsSynced = alwaysReady
  2042  			endpoints.servicesSynced = alwaysReady
  2043  			endpoints.endpointsSynced = alwaysReady
  2044  			endpoints.workerLoopPeriod = 10 * time.Millisecond
  2045  
  2046  			go endpoints.Run(tCtx, 1)
  2047  
  2048  			addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only)
  2049  
  2050  			endpoints.serviceStore.Add(&v1.Service{
  2051  				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  2052  				Spec: v1.ServiceSpec{
  2053  					Selector: map[string]string{"foo": "bar"},
  2054  					Ports:    []v1.ServicePort{{Port: 80}},
  2055  				},
  2056  			})
  2057  
  2058  			for _, update := range tc.deletes {
  2059  				time.Sleep(update.delay)
  2060  
  2061  				old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
  2062  				if err != nil {
  2063  					t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
  2064  				}
  2065  				if !exists {
  2066  					t.Fatalf("Pod %q doesn't exist", update.podName)
  2067  				}
  2068  				endpoints.podStore.Delete(old)
  2069  				endpoints.deletePod(old)
  2070  			}
  2071  
  2072  			time.Sleep(tc.finalDelay)
  2073  			endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
  2074  		})
  2075  	}
  2076  }
  2077  
  2078  func TestSyncEndpointsServiceNotFound(t *testing.T) {
  2079  	ns := metav1.NamespaceDefault
  2080  	testServer, endpointsHandler := makeTestServer(t, ns)
  2081  	defer testServer.Close()
  2082  
  2083  	tCtx := ktesting.Init(t)
  2084  	endpoints := newController(tCtx, testServer.URL, 0)
  2085  	endpoints.endpointsStore.Add(&v1.Endpoints{
  2086  		ObjectMeta: metav1.ObjectMeta{
  2087  			Name:            "foo",
  2088  			Namespace:       ns,
  2089  			ResourceVersion: "1",
  2090  		},
  2091  	})
  2092  	err := endpoints.syncService(tCtx, ns+"/foo")
  2093  	if err != nil {
  2094  		t.Errorf("Unexpected error syncing service %v", err)
  2095  	}
  2096  	endpointsHandler.ValidateRequestCount(t, 1)
  2097  	endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "DELETE", nil)
  2098  }
  2099  
  2100  func TestSyncServiceOverCapacity(t *testing.T) {
  2101  	testCases := []struct {
  2102  		name                string
  2103  		startingAnnotation  *string
  2104  		numExisting         int
  2105  		numDesired          int
  2106  		numDesiredNotReady  int
  2107  		numExpectedReady    int
  2108  		numExpectedNotReady int
  2109  		expectedAnnotation  bool
  2110  	}{{
  2111  		name:                "empty",
  2112  		startingAnnotation:  nil,
  2113  		numExisting:         0,
  2114  		numDesired:          0,
  2115  		numExpectedReady:    0,
  2116  		numExpectedNotReady: 0,
  2117  		expectedAnnotation:  false,
  2118  	}, {
  2119  		name:                "annotation added past capacity, < than maxCapacity of Ready Addresses",
  2120  		startingAnnotation:  nil,
  2121  		numExisting:         maxCapacity - 1,
  2122  		numDesired:          maxCapacity - 3,
  2123  		numDesiredNotReady:  4,
  2124  		numExpectedReady:    maxCapacity - 3,
  2125  		numExpectedNotReady: 3,
  2126  		expectedAnnotation:  true,
  2127  	}, {
  2128  		name:                "annotation added past capacity, maxCapacity of Ready Addresses ",
  2129  		startingAnnotation:  nil,
  2130  		numExisting:         maxCapacity - 1,
  2131  		numDesired:          maxCapacity,
  2132  		numDesiredNotReady:  10,
  2133  		numExpectedReady:    maxCapacity,
  2134  		numExpectedNotReady: 0,
  2135  		expectedAnnotation:  true,
  2136  	}, {
  2137  		name:                "annotation removed below capacity",
  2138  		startingAnnotation:  pointer.String("truncated"),
  2139  		numExisting:         maxCapacity - 1,
  2140  		numDesired:          maxCapacity - 1,
  2141  		numDesiredNotReady:  0,
  2142  		numExpectedReady:    maxCapacity - 1,
  2143  		numExpectedNotReady: 0,
  2144  		expectedAnnotation:  false,
  2145  	}, {
  2146  		name:                "annotation was set to warning previously, annotation removed at capacity",
  2147  		startingAnnotation:  pointer.String("warning"),
  2148  		numExisting:         maxCapacity,
  2149  		numDesired:          maxCapacity,
  2150  		numDesiredNotReady:  0,
  2151  		numExpectedReady:    maxCapacity,
  2152  		numExpectedNotReady: 0,
  2153  		expectedAnnotation:  false,
  2154  	}, {
  2155  		name:                "annotation was set to warning previously but still over capacity",
  2156  		startingAnnotation:  pointer.String("warning"),
  2157  		numExisting:         maxCapacity + 1,
  2158  		numDesired:          maxCapacity + 1,
  2159  		numDesiredNotReady:  0,
  2160  		numExpectedReady:    maxCapacity,
  2161  		numExpectedNotReady: 0,
  2162  		expectedAnnotation:  true,
  2163  	}, {
  2164  		name:                "annotation removed at capacity",
  2165  		startingAnnotation:  pointer.String("truncated"),
  2166  		numExisting:         maxCapacity,
  2167  		numDesired:          maxCapacity,
  2168  		numDesiredNotReady:  0,
  2169  		numExpectedReady:    maxCapacity,
  2170  		numExpectedNotReady: 0,
  2171  		expectedAnnotation:  false,
  2172  	}, {
  2173  		name:                "no endpoints change, annotation value corrected",
  2174  		startingAnnotation:  pointer.String("invalid"),
  2175  		numExisting:         maxCapacity + 1,
  2176  		numDesired:          maxCapacity + 1,
  2177  		numDesiredNotReady:  0,
  2178  		numExpectedReady:    maxCapacity,
  2179  		numExpectedNotReady: 0,
  2180  		expectedAnnotation:  true,
  2181  	}}
  2182  
  2183  	for _, tc := range testCases {
  2184  		t.Run(tc.name, func(t *testing.T) {
  2185  			tCtx := ktesting.Init(t)
  2186  			ns := "test"
  2187  			client, c := newFakeController(tCtx, 0*time.Second)
  2188  
  2189  			addPods(c.podStore, ns, tc.numDesired, 1, tc.numDesiredNotReady, ipv4only)
  2190  			pods := c.podStore.List()
  2191  
  2192  			svc := &v1.Service{
  2193  				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  2194  				Spec: v1.ServiceSpec{
  2195  					Selector: map[string]string{"foo": "bar"},
  2196  					Ports:    []v1.ServicePort{{Port: 80}},
  2197  				},
  2198  			}
  2199  			c.serviceStore.Add(svc)
  2200  
  2201  			subset := v1.EndpointSubset{}
  2202  			for i := 0; i < tc.numExisting; i++ {
  2203  				pod := pods[i].(*v1.Pod)
  2204  				epa, _ := podToEndpointAddressForService(svc, pod)
  2205  				subset.Addresses = append(subset.Addresses, *epa)
  2206  			}
  2207  			endpoints := &v1.Endpoints{
  2208  				ObjectMeta: metav1.ObjectMeta{
  2209  					Name:            svc.Name,
  2210  					Namespace:       ns,
  2211  					ResourceVersion: "1",
  2212  					Annotations:     map[string]string{},
  2213  				},
  2214  				Subsets: []v1.EndpointSubset{subset},
  2215  			}
  2216  			if tc.startingAnnotation != nil {
  2217  				endpoints.Annotations[v1.EndpointsOverCapacity] = *tc.startingAnnotation
  2218  			}
  2219  			c.endpointsStore.Add(endpoints)
  2220  			_, err := client.CoreV1().Endpoints(ns).Create(tCtx, endpoints, metav1.CreateOptions{})
  2221  			if err != nil {
  2222  				t.Fatalf("unexpected error creating endpoints: %v", err)
  2223  			}
  2224  
  2225  			err = c.syncService(tCtx, fmt.Sprintf("%s/%s", ns, svc.Name))
  2226  			if err != nil {
  2227  				t.Errorf("Unexpected error syncing service %v", err)
  2228  			}
  2229  
  2230  			actualEndpoints, err := client.CoreV1().Endpoints(ns).Get(tCtx, endpoints.Name, metav1.GetOptions{})
  2231  			if err != nil {
  2232  				t.Fatalf("unexpected error getting endpoints: %v", err)
  2233  			}
  2234  
  2235  			actualAnnotation, ok := actualEndpoints.Annotations[v1.EndpointsOverCapacity]
  2236  			if tc.expectedAnnotation {
  2237  				if !ok {
  2238  					t.Errorf("Expected EndpointsOverCapacity annotation to be set")
  2239  				} else if actualAnnotation != "truncated" {
  2240  					t.Errorf("Expected EndpointsOverCapacity annotation to be 'truncated', got %s", actualAnnotation)
  2241  				}
  2242  			} else {
  2243  				if ok {
  2244  					t.Errorf("Expected EndpointsOverCapacity annotation not to be set, got %s", actualAnnotation)
  2245  				}
  2246  			}
  2247  			numActualReady := 0
  2248  			numActualNotReady := 0
  2249  			for _, subset := range actualEndpoints.Subsets {
  2250  				numActualReady += len(subset.Addresses)
  2251  				numActualNotReady += len(subset.NotReadyAddresses)
  2252  			}
  2253  			if numActualReady != tc.numExpectedReady {
  2254  				t.Errorf("Unexpected number of actual ready Endpoints: got %d endpoints, want %d endpoints", numActualReady, tc.numExpectedReady)
  2255  			}
  2256  			if numActualNotReady != tc.numExpectedNotReady {
  2257  				t.Errorf("Unexpected number of actual not ready Endpoints: got %d endpoints, want %d endpoints", numActualNotReady, tc.numExpectedNotReady)
  2258  			}
  2259  		})
  2260  	}
  2261  }
  2262  
  2263  func TestTruncateEndpoints(t *testing.T) {
  2264  	testCases := []struct {
  2265  		desc string
  2266  		// subsetsReady, subsetsNotReady, expectedReady, expectedNotReady
  2267  		// must all be the same length
  2268  		subsetsReady     []int
  2269  		subsetsNotReady  []int
  2270  		expectedReady    []int
  2271  		expectedNotReady []int
  2272  	}{{
  2273  		desc:             "empty",
  2274  		subsetsReady:     []int{},
  2275  		subsetsNotReady:  []int{},
  2276  		expectedReady:    []int{},
  2277  		expectedNotReady: []int{},
  2278  	}, {
  2279  		desc:             "total endpoints < max capacity",
  2280  		subsetsReady:     []int{50, 100, 100, 100, 100},
  2281  		subsetsNotReady:  []int{50, 100, 100, 100, 100},
  2282  		expectedReady:    []int{50, 100, 100, 100, 100},
  2283  		expectedNotReady: []int{50, 100, 100, 100, 100},
  2284  	}, {
  2285  		desc:             "total endpoints = max capacity",
  2286  		subsetsReady:     []int{100, 100, 100, 100, 100},
  2287  		subsetsNotReady:  []int{100, 100, 100, 100, 100},
  2288  		expectedReady:    []int{100, 100, 100, 100, 100},
  2289  		expectedNotReady: []int{100, 100, 100, 100, 100},
  2290  	}, {
  2291  		desc:             "total ready endpoints < max capacity, but total endpoints > max capacity",
  2292  		subsetsReady:     []int{90, 110, 50, 10, 20},
  2293  		subsetsNotReady:  []int{101, 200, 200, 201, 298},
  2294  		expectedReady:    []int{90, 110, 50, 10, 20},
  2295  		expectedNotReady: []int{73, 144, 144, 145, 214},
  2296  	}, {
  2297  		desc:             "total ready endpoints > max capacity",
  2298  		subsetsReady:     []int{205, 400, 402, 400, 693},
  2299  		subsetsNotReady:  []int{100, 200, 200, 200, 300},
  2300  		expectedReady:    []int{98, 191, 192, 191, 328},
  2301  		expectedNotReady: []int{0, 0, 0, 0, 0},
  2302  	}}
  2303  
  2304  	for _, tc := range testCases {
  2305  		t.Run(tc.desc, func(t *testing.T) {
  2306  			var subsets []v1.EndpointSubset
  2307  			for subsetIndex, numReady := range tc.subsetsReady {
  2308  				subset := v1.EndpointSubset{}
  2309  				for i := 0; i < numReady; i++ {
  2310  					subset.Addresses = append(subset.Addresses, v1.EndpointAddress{})
  2311  				}
  2312  
  2313  				numNotReady := tc.subsetsNotReady[subsetIndex]
  2314  				for i := 0; i < numNotReady; i++ {
  2315  					subset.NotReadyAddresses = append(subset.NotReadyAddresses, v1.EndpointAddress{})
  2316  				}
  2317  				subsets = append(subsets, subset)
  2318  			}
  2319  
  2320  			endpoints := &v1.Endpoints{Subsets: subsets}
  2321  			truncateEndpoints(endpoints)
  2322  
  2323  			for i, subset := range endpoints.Subsets {
  2324  				if len(subset.Addresses) != tc.expectedReady[i] {
  2325  					t.Errorf("Unexpected number of actual ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.Addresses), tc.expectedReady[i])
  2326  				}
  2327  				if len(subset.NotReadyAddresses) != tc.expectedNotReady[i] {
  2328  					t.Errorf("Unexpected number of actual not ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.NotReadyAddresses), tc.expectedNotReady[i])
  2329  				}
  2330  			}
  2331  		})
  2332  	}
  2333  }
  2334  
  2335  func TestEndpointPortFromServicePort(t *testing.T) {
  2336  	http := pointer.String("http")
  2337  	testCases := map[string]struct {
  2338  		serviceAppProtocol           *string
  2339  		expectedEndpointsAppProtocol *string
  2340  	}{
  2341  		"empty app protocol": {
  2342  			serviceAppProtocol:           nil,
  2343  			expectedEndpointsAppProtocol: nil,
  2344  		},
  2345  		"http app protocol": {
  2346  			serviceAppProtocol:           http,
  2347  			expectedEndpointsAppProtocol: http,
  2348  		},
  2349  	}
  2350  
  2351  	for name, tc := range testCases {
  2352  		t.Run(name, func(t *testing.T) {
  2353  			epp := endpointPortFromServicePort(&v1.ServicePort{Name: "test", AppProtocol: tc.serviceAppProtocol}, 80)
  2354  
  2355  			if epp.AppProtocol != tc.expectedEndpointsAppProtocol {
  2356  				t.Errorf("Expected Endpoints AppProtocol to be %s, got %s", stringVal(tc.expectedEndpointsAppProtocol), stringVal(epp.AppProtocol))
  2357  			}
  2358  		})
  2359  	}
  2360  }
  2361  
  2362  // TestMultipleServiceChanges tests that endpoints that are not created because of an out of sync endpoints cache are eventually recreated
  2363  // A service will be created. After the endpoints exist, the service will be deleted and the endpoints will not be deleted from the cache immediately.
  2364  // After the service is recreated, the endpoints will be deleted replicating an out of sync cache. Expect that eventually the endpoints will be recreated.
  2365  func TestMultipleServiceChanges(t *testing.T) {
  2366  	ns := metav1.NamespaceDefault
  2367  	expectedSubsets := []v1.EndpointSubset{{
  2368  		Addresses: []v1.EndpointAddress{
  2369  			{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
  2370  		},
  2371  	}}
  2372  	endpoint := &v1.Endpoints{
  2373  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"},
  2374  		Subsets:    expectedSubsets,
  2375  	}
  2376  
  2377  	controller := &endpointController{}
  2378  	blockDelete := make(chan struct{})
  2379  	blockNextAction := make(chan struct{})
  2380  	stopChan := make(chan struct{})
  2381  	testServer := makeBlockingEndpointDeleteTestServer(t, controller, endpoint, blockDelete, blockNextAction, ns)
  2382  	defer testServer.Close()
  2383  
  2384  	tCtx := ktesting.Init(t)
  2385  	*controller = *newController(tCtx, testServer.URL, 0*time.Second)
  2386  	addPods(controller.podStore, ns, 1, 1, 0, ipv4only)
  2387  
  2388  	go func() { controller.Run(tCtx, 1) }()
  2389  
  2390  	svc := &v1.Service{
  2391  		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  2392  		Spec: v1.ServiceSpec{
  2393  			Selector:  map[string]string{"foo": "bar"},
  2394  			ClusterIP: "None",
  2395  			Ports:     nil,
  2396  		},
  2397  	}
  2398  
  2399  	controller.serviceStore.Add(svc)
  2400  	controller.onServiceUpdate(svc)
  2401  	// blockNextAction should eventually unblock once server gets endpoint request.
  2402  	waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Add should have caused a request to be sent to the test server")
  2403  
  2404  	controller.serviceStore.Delete(svc)
  2405  	controller.onServiceDelete(svc)
  2406  	waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Delete should have caused a request to be sent to the test server")
  2407  
  2408  	// If endpoints cache has not updated before service update is registered
  2409  	// Services add will not trigger a Create endpoint request.
  2410  	controller.serviceStore.Add(svc)
  2411  	controller.onServiceUpdate(svc)
  2412  
  2413  	// Ensure the work queue has been processed by looping for up to a second to prevent flakes.
  2414  	wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
  2415  		return controller.queue.Len() == 0, nil
  2416  	})
  2417  
  2418  	// Cause test server to delete endpoints
  2419  	close(blockDelete)
  2420  	waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoint should have been recreated")
  2421  
  2422  	close(blockNextAction)
  2423  	close(stopChan)
  2424  }
  2425  
  2426  func TestSyncServiceAddresses(t *testing.T) {
  2427  	makeService := func(tolerateUnready bool) *v1.Service {
  2428  		return &v1.Service{
  2429  			ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"},
  2430  			Spec: v1.ServiceSpec{
  2431  				Selector:                 map[string]string{"foo": "bar"},
  2432  				PublishNotReadyAddresses: tolerateUnready,
  2433  				Type:                     v1.ServiceTypeClusterIP,
  2434  				ClusterIP:                "1.1.1.1",
  2435  				Ports:                    []v1.ServicePort{{Port: 80}},
  2436  			},
  2437  		}
  2438  	}
  2439  
  2440  	makePod := func(phase v1.PodPhase, isReady bool, terminating bool) *v1.Pod {
  2441  		statusCondition := v1.ConditionFalse
  2442  		if isReady {
  2443  			statusCondition = v1.ConditionTrue
  2444  		}
  2445  
  2446  		now := metav1.Now()
  2447  		deletionTimestamp := &now
  2448  		if !terminating {
  2449  			deletionTimestamp = nil
  2450  		}
  2451  		return &v1.Pod{
  2452  			ObjectMeta: metav1.ObjectMeta{
  2453  				Namespace:         "ns",
  2454  				Name:              "fakepod",
  2455  				DeletionTimestamp: deletionTimestamp,
  2456  				Labels:            map[string]string{"foo": "bar"},
  2457  			},
  2458  			Spec: v1.PodSpec{
  2459  				Containers: []v1.Container{{Ports: []v1.ContainerPort{
  2460  					{Name: "port1", ContainerPort: int32(8080)},
  2461  				}}},
  2462  			},
  2463  			Status: v1.PodStatus{
  2464  				Phase: phase,
  2465  				Conditions: []v1.PodCondition{
  2466  					{
  2467  						Type:   v1.PodReady,
  2468  						Status: statusCondition,
  2469  					},
  2470  				},
  2471  				PodIP: "10.1.1.1",
  2472  				PodIPs: []v1.PodIP{
  2473  					{IP: "10.1.1.1"},
  2474  				},
  2475  			},
  2476  		}
  2477  	}
  2478  
  2479  	testCases := []struct {
  2480  		name            string
  2481  		pod             *v1.Pod
  2482  		service         *v1.Service
  2483  		expectedReady   int
  2484  		expectedUnready int
  2485  	}{
  2486  		{
  2487  			name:            "pod running phase",
  2488  			pod:             makePod(v1.PodRunning, true, false),
  2489  			service:         makeService(false),
  2490  			expectedReady:   1,
  2491  			expectedUnready: 0,
  2492  		},
  2493  		{
  2494  			name:            "pod running phase being deleted",
  2495  			pod:             makePod(v1.PodRunning, true, true),
  2496  			service:         makeService(false),
  2497  			expectedReady:   0,
  2498  			expectedUnready: 0,
  2499  		},
  2500  		{
  2501  			name:            "pod unknown phase container ready",
  2502  			pod:             makePod(v1.PodUnknown, true, false),
  2503  			service:         makeService(false),
  2504  			expectedReady:   1,
  2505  			expectedUnready: 0,
  2506  		},
  2507  		{
  2508  			name:            "pod unknown phase container ready being deleted",
  2509  			pod:             makePod(v1.PodUnknown, true, true),
  2510  			service:         makeService(false),
  2511  			expectedReady:   0,
  2512  			expectedUnready: 0,
  2513  		},
  2514  		{
  2515  			name:            "pod pending phase container ready",
  2516  			pod:             makePod(v1.PodPending, true, false),
  2517  			service:         makeService(false),
  2518  			expectedReady:   1,
  2519  			expectedUnready: 0,
  2520  		},
  2521  		{
  2522  			name:            "pod pending phase container ready being deleted",
  2523  			pod:             makePod(v1.PodPending, true, true),
  2524  			service:         makeService(false),
  2525  			expectedReady:   0,
  2526  			expectedUnready: 0,
  2527  		},
  2528  		{
  2529  			name:            "pod unknown phase container not ready",
  2530  			pod:             makePod(v1.PodUnknown, false, false),
  2531  			service:         makeService(false),
  2532  			expectedReady:   0,
  2533  			expectedUnready: 1,
  2534  		},
  2535  		{
  2536  			name:            "pod pending phase container not ready",
  2537  			pod:             makePod(v1.PodPending, false, false),
  2538  			service:         makeService(false),
  2539  			expectedReady:   0,
  2540  			expectedUnready: 1,
  2541  		},
  2542  		{
  2543  			name:            "pod failed phase",
  2544  			pod:             makePod(v1.PodFailed, false, false),
  2545  			service:         makeService(false),
  2546  			expectedReady:   0,
  2547  			expectedUnready: 0,
  2548  		},
  2549  		{
  2550  			name:            "pod succeeded phase",
  2551  			pod:             makePod(v1.PodSucceeded, false, false),
  2552  			service:         makeService(false),
  2553  			expectedReady:   0,
  2554  			expectedUnready: 0,
  2555  		},
  2556  		{
  2557  			name:            "pod running phase and tolerate unready",
  2558  			pod:             makePod(v1.PodRunning, false, false),
  2559  			service:         makeService(true),
  2560  			expectedReady:   1,
  2561  			expectedUnready: 0,
  2562  		},
  2563  		{
  2564  			name:            "pod running phase and tolerate unready being deleted",
  2565  			pod:             makePod(v1.PodRunning, false, true),
  2566  			service:         makeService(true),
  2567  			expectedReady:   1,
  2568  			expectedUnready: 0,
  2569  		},
  2570  		{
  2571  			name:            "pod unknown phase and tolerate unready",
  2572  			pod:             makePod(v1.PodUnknown, false, false),
  2573  			service:         makeService(true),
  2574  			expectedReady:   1,
  2575  			expectedUnready: 0,
  2576  		},
  2577  		{
  2578  			name:            "pod unknown phase and tolerate unready being deleted",
  2579  			pod:             makePod(v1.PodUnknown, false, true),
  2580  			service:         makeService(true),
  2581  			expectedReady:   1,
  2582  			expectedUnready: 0,
  2583  		},
  2584  		{
  2585  			name:            "pod pending phase and tolerate unready",
  2586  			pod:             makePod(v1.PodPending, false, false),
  2587  			service:         makeService(true),
  2588  			expectedReady:   1,
  2589  			expectedUnready: 0,
  2590  		},
  2591  		{
  2592  			name:            "pod pending phase and tolerate unready being deleted",
  2593  			pod:             makePod(v1.PodPending, false, true),
  2594  			service:         makeService(true),
  2595  			expectedReady:   1,
  2596  			expectedUnready: 0,
  2597  		},
  2598  		{
  2599  			name:            "pod failed phase and tolerate unready",
  2600  			pod:             makePod(v1.PodFailed, false, false),
  2601  			service:         makeService(true),
  2602  			expectedReady:   0,
  2603  			expectedUnready: 0,
  2604  		},
  2605  		{
  2606  			name:            "pod succeeded phase and tolerate unready endpoints",
  2607  			pod:             makePod(v1.PodSucceeded, false, false),
  2608  			service:         makeService(true),
  2609  			expectedReady:   0,
  2610  			expectedUnready: 0,
  2611  		},
  2612  	}
  2613  
  2614  	for _, tc := range testCases {
  2615  		t.Run(tc.name, func(t *testing.T) {
  2616  			tCtx := ktesting.Init(t)
  2617  
  2618  			ns := tc.service.Namespace
  2619  			client, c := newFakeController(tCtx, 0*time.Second)
  2620  
  2621  			err := c.podStore.Add(tc.pod)
  2622  			if err != nil {
  2623  				t.Errorf("Unexpected error adding pod %v", err)
  2624  			}
  2625  			err = c.serviceStore.Add(tc.service)
  2626  			if err != nil {
  2627  				t.Errorf("Unexpected error adding service %v", err)
  2628  			}
  2629  			err = c.syncService(tCtx, fmt.Sprintf("%s/%s", ns, tc.service.Name))
  2630  			if err != nil {
  2631  				t.Errorf("Unexpected error syncing service %v", err)
  2632  			}
  2633  
  2634  			endpoints, err := client.CoreV1().Endpoints(ns).Get(tCtx, tc.service.Name, metav1.GetOptions{})
  2635  			if err != nil {
  2636  				t.Errorf("Unexpected error %v", err)
  2637  			}
  2638  
  2639  			readyEndpoints := 0
  2640  			unreadyEndpoints := 0
  2641  			for _, subset := range endpoints.Subsets {
  2642  				readyEndpoints += len(subset.Addresses)
  2643  				unreadyEndpoints += len(subset.NotReadyAddresses)
  2644  			}
  2645  
  2646  			if tc.expectedReady != readyEndpoints {
  2647  				t.Errorf("Expected %d ready endpoints, got %d", tc.expectedReady, readyEndpoints)
  2648  			}
  2649  
  2650  			if tc.expectedUnready != unreadyEndpoints {
  2651  				t.Errorf("Expected %d ready endpoints, got %d", tc.expectedUnready, unreadyEndpoints)
  2652  			}
  2653  		})
  2654  	}
  2655  }
  2656  
  2657  func TestEndpointsDeletionEvents(t *testing.T) {
  2658  	ns := metav1.NamespaceDefault
  2659  	testServer, _ := makeTestServer(t, ns)
  2660  	defer testServer.Close()
  2661  
  2662  	tCtx := ktesting.Init(t)
  2663  	controller := newController(tCtx, testServer.URL, 0)
  2664  	store := controller.endpointsStore
  2665  	ep1 := &v1.Endpoints{
  2666  		ObjectMeta: metav1.ObjectMeta{
  2667  			Name:            "foo",
  2668  			Namespace:       ns,
  2669  			ResourceVersion: "rv1",
  2670  		},
  2671  	}
  2672  
  2673  	// Test Unexpected and Expected Deletes
  2674  	store.Delete(ep1)
  2675  	controller.onEndpointsDelete(ep1)
  2676  
  2677  	if controller.queue.Len() != 1 {
  2678  		t.Errorf("Expected one service to be in the queue, found %d", controller.queue.Len())
  2679  	}
  2680  }
  2681  
  2682  func stringVal(str *string) string {
  2683  	if str == nil {
  2684  		return "nil"
  2685  	}
  2686  	return *str
  2687  }
  2688  
  2689  // waitForChanReceive blocks up to the timeout waiting for the receivingChan to receive
  2690  func waitForChanReceive(t *testing.T, timeout time.Duration, receivingChan chan struct{}, errorMsg string) {
  2691  	timer := time.NewTimer(timeout)
  2692  	select {
  2693  	case <-timer.C:
  2694  		t.Errorf(errorMsg)
  2695  	case <-receivingChan:
  2696  	}
  2697  }
  2698  
  2699  func TestEndpointSubsetsEqualIgnoreResourceVersion(t *testing.T) {
  2700  	copyAndMutateEndpointSubset := func(orig *v1.EndpointSubset, mutator func(*v1.EndpointSubset)) *v1.EndpointSubset {
  2701  		newSubSet := orig.DeepCopy()
  2702  		mutator(newSubSet)
  2703  		return newSubSet
  2704  	}
  2705  	es1 := &v1.EndpointSubset{
  2706  		Addresses: []v1.EndpointAddress{
  2707  			{
  2708  				IP:        "1.1.1.1",
  2709  				TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-1", Namespace: "ns", ResourceVersion: "1"},
  2710  			},
  2711  		},
  2712  		NotReadyAddresses: []v1.EndpointAddress{
  2713  			{
  2714  				IP:        "1.1.1.2",
  2715  				TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-2", Namespace: "ns2", ResourceVersion: "2"},
  2716  			},
  2717  		},
  2718  		Ports: []v1.EndpointPort{{Port: 8081, Protocol: "TCP"}},
  2719  	}
  2720  	es2 := &v1.EndpointSubset{
  2721  		Addresses: []v1.EndpointAddress{
  2722  			{
  2723  				IP:        "2.2.2.1",
  2724  				TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-1", Namespace: "ns", ResourceVersion: "3"},
  2725  			},
  2726  		},
  2727  		NotReadyAddresses: []v1.EndpointAddress{
  2728  			{
  2729  				IP:        "2.2.2.2",
  2730  				TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-2", Namespace: "ns2", ResourceVersion: "4"},
  2731  			},
  2732  		},
  2733  		Ports: []v1.EndpointPort{{Port: 8082, Protocol: "TCP"}},
  2734  	}
  2735  	tests := []struct {
  2736  		name     string
  2737  		subsets1 []v1.EndpointSubset
  2738  		subsets2 []v1.EndpointSubset
  2739  		expected bool
  2740  	}{
  2741  		{
  2742  			name:     "Subsets removed",
  2743  			subsets1: []v1.EndpointSubset{*es1, *es2},
  2744  			subsets2: []v1.EndpointSubset{*es1},
  2745  			expected: false,
  2746  		},
  2747  		{
  2748  			name:     "Ready Pod IP changed",
  2749  			subsets1: []v1.EndpointSubset{*es1, *es2},
  2750  			subsets2: []v1.EndpointSubset{*copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) {
  2751  				es.Addresses[0].IP = "1.1.1.10"
  2752  			}), *es2},
  2753  			expected: false,
  2754  		},
  2755  		{
  2756  			name:     "NotReady Pod IP changed",
  2757  			subsets1: []v1.EndpointSubset{*es1, *es2},
  2758  			subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
  2759  				es.NotReadyAddresses[0].IP = "2.2.2.10"
  2760  			})},
  2761  			expected: false,
  2762  		},
  2763  		{
  2764  			name:     "Pod ResourceVersion changed",
  2765  			subsets1: []v1.EndpointSubset{*es1, *es2},
  2766  			subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
  2767  				es.Addresses[0].TargetRef.ResourceVersion = "100"
  2768  			})},
  2769  			expected: true,
  2770  		},
  2771  		{
  2772  			name:     "Pod ResourceVersion removed",
  2773  			subsets1: []v1.EndpointSubset{*es1, *es2},
  2774  			subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
  2775  				es.Addresses[0].TargetRef.ResourceVersion = ""
  2776  			})},
  2777  			expected: true,
  2778  		},
  2779  		{
  2780  			name:     "Ports changed",
  2781  			subsets1: []v1.EndpointSubset{*es1, *es2},
  2782  			subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) {
  2783  				es.Ports[0].Port = 8082
  2784  			})},
  2785  			expected: false,
  2786  		},
  2787  	}
  2788  	for _, tt := range tests {
  2789  		t.Run(tt.name, func(t *testing.T) {
  2790  			if got := endpointSubsetsEqualIgnoreResourceVersion(tt.subsets1, tt.subsets2); got != tt.expected {
  2791  				t.Errorf("semanticIgnoreResourceVersion.DeepEqual() = %v, expected %v", got, tt.expected)
  2792  			}
  2793  		})
  2794  	}
  2795  }
  2796  

View as plain text