1
16
17 package endpoint
18
19 import (
20 "context"
21 "fmt"
22 "net/http"
23 "net/http/httptest"
24 "reflect"
25 "strconv"
26 "testing"
27 "time"
28
29 "github.com/google/go-cmp/cmp"
30 v1 "k8s.io/api/core/v1"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/runtime"
33 "k8s.io/apimachinery/pkg/runtime/schema"
34 "k8s.io/apimachinery/pkg/util/intstr"
35 "k8s.io/apimachinery/pkg/util/wait"
36 "k8s.io/client-go/informers"
37 clientset "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/kubernetes/fake"
39 clientscheme "k8s.io/client-go/kubernetes/scheme"
40 restclient "k8s.io/client-go/rest"
41 "k8s.io/client-go/tools/cache"
42 utiltesting "k8s.io/client-go/util/testing"
43 endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
44 api "k8s.io/kubernetes/pkg/apis/core"
45 controllerpkg "k8s.io/kubernetes/pkg/controller"
46 "k8s.io/kubernetes/test/utils/ktesting"
47 utilnet "k8s.io/utils/net"
48 "k8s.io/utils/pointer"
49 )
50
51 var alwaysReady = func() bool { return true }
52 var neverReady = func() bool { return false }
53 var emptyNodeName string
54 var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
55 var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
56 var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
57
58 var ipv4only = []v1.IPFamily{v1.IPv4Protocol}
59 var ipv6only = []v1.IPFamily{v1.IPv6Protocol}
60 var ipv4ipv6 = []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol}
61 var ipv6ipv4 = []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol}
62
63 func testPod(namespace string, id int, nPorts int, isReady bool, ipFamilies []v1.IPFamily) *v1.Pod {
64 p := &v1.Pod{
65 TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
66 ObjectMeta: metav1.ObjectMeta{
67 Namespace: namespace,
68 Name: fmt.Sprintf("pod%d", id),
69 Labels: map[string]string{"foo": "bar"},
70 ResourceVersion: fmt.Sprint(id),
71 },
72 Spec: v1.PodSpec{
73 Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
74 },
75 Status: v1.PodStatus{
76 Conditions: []v1.PodCondition{
77 {
78 Type: v1.PodReady,
79 Status: v1.ConditionTrue,
80 },
81 },
82 },
83 }
84 if !isReady {
85 p.Status.Conditions[0].Status = v1.ConditionFalse
86 }
87 for j := 0; j < nPorts; j++ {
88 p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
89 v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
90 }
91 for _, family := range ipFamilies {
92 var ip string
93 if family == v1.IPv4Protocol {
94 ip = fmt.Sprintf("1.2.3.%d", 4+id)
95 } else {
96 ip = fmt.Sprintf("2000::%d", 4+id)
97 }
98 p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: ip})
99 }
100 p.Status.PodIP = p.Status.PodIPs[0].IP
101
102 return p
103 }
104
105 func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, ipFamilies []v1.IPFamily) {
106 for i := 0; i < nPods+nNotReady; i++ {
107 isReady := i < nPods
108 pod := testPod(namespace, i, nPorts, isReady, ipFamilies)
109 store.Add(pod)
110 }
111 }
112
113 func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) {
114 for i := 0; i < nPods; i++ {
115 p := &v1.Pod{
116 TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
117 ObjectMeta: metav1.ObjectMeta{
118 Namespace: namespace,
119 Name: fmt.Sprintf("pod%d", i),
120 Labels: map[string]string{"foo": "bar"},
121 },
122 Spec: v1.PodSpec{
123 RestartPolicy: restartPolicy,
124 Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
125 },
126 Status: v1.PodStatus{
127 PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
128 Phase: podPhase,
129 Conditions: []v1.PodCondition{
130 {
131 Type: v1.PodReady,
132 Status: v1.ConditionFalse,
133 },
134 },
135 },
136 }
137 for j := 0; j < nPorts; j++ {
138 p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
139 v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
140 }
141 store.Add(p)
142 }
143 }
144
145 func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) {
146 fakeEndpointsHandler := utiltesting.FakeHandler{
147 StatusCode: http.StatusOK,
148 ResponseBody: runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{}),
149 }
150 mux := http.NewServeMux()
151 if namespace == "" {
152 t.Fatal("namespace cannot be empty")
153 }
154 mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints", &fakeEndpointsHandler)
155 mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints/", &fakeEndpointsHandler)
156 mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
157 t.Errorf("unexpected request: %v", req.RequestURI)
158 http.Error(res, "", http.StatusNotFound)
159 })
160 return httptest.NewServer(mux), &fakeEndpointsHandler
161 }
162
163
164
165
166 func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
167
168 handlerFunc := func(res http.ResponseWriter, req *http.Request) {
169 if controller == nil {
170 res.WriteHeader(http.StatusInternalServerError)
171 res.Write([]byte("controller has not been set yet"))
172 return
173 }
174
175 if req.Method == "POST" {
176 controller.endpointsStore.Add(endpoint)
177 blockNextAction <- struct{}{}
178 }
179
180 if req.Method == "DELETE" {
181 go func() {
182
183 <-blockDelete
184 controller.endpointsStore.Delete(endpoint)
185 controller.onEndpointsDelete(endpoint)
186 }()
187 blockNextAction <- struct{}{}
188 }
189
190 res.WriteHeader(http.StatusOK)
191 res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{})))
192 }
193
194 mux := http.NewServeMux()
195 mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints", handlerFunc)
196 mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints/", handlerFunc)
197 mux.HandleFunc("/api/v1/namespaces/"+namespace+"/events", func(res http.ResponseWriter, req *http.Request) {})
198 mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
199 t.Errorf("unexpected request: %v", req.RequestURI)
200 http.Error(res, "", http.StatusNotFound)
201 })
202 return httptest.NewServer(mux)
203
204 }
205
206 type endpointController struct {
207 *Controller
208 podStore cache.Store
209 serviceStore cache.Store
210 endpointsStore cache.Store
211 }
212
213 func newController(ctx context.Context, url string, batchPeriod time.Duration) *endpointController {
214 client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
215 informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc())
216 endpoints := NewEndpointController(ctx, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(),
217 informerFactory.Core().V1().Endpoints(), client, batchPeriod)
218 endpoints.podsSynced = alwaysReady
219 endpoints.servicesSynced = alwaysReady
220 endpoints.endpointsSynced = alwaysReady
221 return &endpointController{
222 endpoints,
223 informerFactory.Core().V1().Pods().Informer().GetStore(),
224 informerFactory.Core().V1().Services().Informer().GetStore(),
225 informerFactory.Core().V1().Endpoints().Informer().GetStore(),
226 }
227 }
228
229 func newFakeController(ctx context.Context, batchPeriod time.Duration) (*fake.Clientset, *endpointController) {
230 client := fake.NewSimpleClientset()
231 informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc())
232
233 eController := NewEndpointController(
234 ctx,
235 informerFactory.Core().V1().Pods(),
236 informerFactory.Core().V1().Services(),
237 informerFactory.Core().V1().Endpoints(),
238 client,
239 batchPeriod)
240
241 eController.podsSynced = alwaysReady
242 eController.servicesSynced = alwaysReady
243 eController.endpointsSynced = alwaysReady
244
245 return client, &endpointController{
246 eController,
247 informerFactory.Core().V1().Pods().Informer().GetStore(),
248 informerFactory.Core().V1().Services().Informer().GetStore(),
249 informerFactory.Core().V1().Endpoints().Informer().GetStore(),
250 }
251 }
252
253 func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
254 ns := metav1.NamespaceDefault
255 testServer, endpointsHandler := makeTestServer(t, ns)
256 defer testServer.Close()
257
258 tCtx := ktesting.Init(t)
259 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
260 endpoints.endpointsStore.Add(&v1.Endpoints{
261 ObjectMeta: metav1.ObjectMeta{
262 Name: "foo",
263 Namespace: ns,
264 ResourceVersion: "1",
265 },
266 Subsets: []v1.EndpointSubset{{
267 Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
268 Ports: []v1.EndpointPort{{Port: 1000}},
269 }},
270 })
271 endpoints.serviceStore.Add(&v1.Service{
272 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
273 Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}},
274 })
275 err := endpoints.syncService(tCtx, ns+"/foo")
276 if err != nil {
277 t.Errorf("Unexpected error syncing service %v", err)
278 }
279 endpointsHandler.ValidateRequestCount(t, 0)
280 }
281
282 func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
283 ns := metav1.NamespaceDefault
284 testServer, endpointsHandler := makeTestServer(t, ns)
285 defer testServer.Close()
286
287 tCtx := ktesting.Init(t)
288 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
289 endpoints.endpointsStore.Add(&v1.Endpoints{
290 ObjectMeta: metav1.ObjectMeta{
291 Name: "foo",
292 Namespace: ns,
293 ResourceVersion: "1",
294 },
295 Subsets: nil,
296 })
297 endpoints.serviceStore.Add(&v1.Service{
298 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
299 Spec: v1.ServiceSpec{
300 Selector: map[string]string{"foo": "bar"},
301 Ports: []v1.ServicePort{{Port: 80}},
302 },
303 })
304 err := endpoints.syncService(tCtx, ns+"/foo")
305 if err != nil {
306 t.Errorf("Unexpected error syncing service %v", err)
307 }
308 endpointsHandler.ValidateRequestCount(t, 0)
309 }
310
311 func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
312 ns := metav1.NamespaceDefault
313 testServer, endpointsHandler := makeTestServer(t, ns)
314 defer testServer.Close()
315
316 tCtx := ktesting.Init(t)
317 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
318 endpoints.endpointsStore.Add(&v1.Endpoints{
319 ObjectMeta: metav1.ObjectMeta{
320 Name: "foo",
321 Namespace: ns,
322 ResourceVersion: "1",
323 },
324 Subsets: []v1.EndpointSubset{},
325 })
326 endpoints.serviceStore.Add(&v1.Service{
327 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
328 Spec: v1.ServiceSpec{
329 Selector: map[string]string{"foo": "bar"},
330 Ports: []v1.ServicePort{{Port: 80}},
331 },
332 })
333 err := endpoints.syncService(tCtx, ns+"/foo")
334 if err != nil {
335 t.Errorf("Unexpected error syncing service %v", err)
336 }
337 endpointsHandler.ValidateRequestCount(t, 0)
338 }
339
340 func TestSyncEndpointsWithPodResourceVersionUpdateOnly(t *testing.T) {
341 ns := metav1.NamespaceDefault
342 testServer, endpointsHandler := makeTestServer(t, ns)
343 defer testServer.Close()
344 pod0 := testPod(ns, 0, 1, true, ipv4only)
345 pod1 := testPod(ns, 1, 1, false, ipv4only)
346 tCtx := ktesting.Init(t)
347 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
348 endpoints.endpointsStore.Add(&v1.Endpoints{
349 ObjectMeta: metav1.ObjectMeta{
350 Name: "foo",
351 Namespace: ns,
352 ResourceVersion: "1",
353 },
354 Subsets: []v1.EndpointSubset{{
355 Addresses: []v1.EndpointAddress{
356 {
357 IP: pod0.Status.PodIPs[0].IP,
358 NodeName: &emptyNodeName,
359 TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod0.Name, Namespace: ns, ResourceVersion: "1"},
360 },
361 },
362 NotReadyAddresses: []v1.EndpointAddress{
363 {
364 IP: pod1.Status.PodIPs[0].IP,
365 NodeName: &emptyNodeName,
366 TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod1.Name, Namespace: ns, ResourceVersion: "2"},
367 },
368 },
369 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
370 }},
371 })
372 endpoints.serviceStore.Add(&v1.Service{
373 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
374 Spec: v1.ServiceSpec{
375 Selector: map[string]string{"foo": "bar"},
376 Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
377 },
378 })
379 pod0.ResourceVersion = "3"
380 pod1.ResourceVersion = "4"
381 endpoints.podStore.Add(pod0)
382 endpoints.podStore.Add(pod1)
383 err := endpoints.syncService(tCtx, ns+"/foo")
384 if err != nil {
385 t.Errorf("Unexpected error syncing service %v", err)
386 }
387
388 endpointsHandler.ValidateRequestCount(t, 0)
389 }
390
391 func TestSyncEndpointsNewNoSubsets(t *testing.T) {
392 ns := metav1.NamespaceDefault
393 testServer, endpointsHandler := makeTestServer(t, ns)
394 defer testServer.Close()
395 tCtx := ktesting.Init(t)
396 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
397 endpoints.serviceStore.Add(&v1.Service{
398 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
399 Spec: v1.ServiceSpec{
400 Selector: map[string]string{"foo": "bar"},
401 Ports: []v1.ServicePort{{Port: 80}},
402 },
403 })
404 err := endpoints.syncService(tCtx, ns+"/foo")
405 if err != nil {
406 t.Errorf("Unexpected error syncing service %v", err)
407 }
408 endpointsHandler.ValidateRequestCount(t, 1)
409 }
410
411 func TestCheckLeftoverEndpoints(t *testing.T) {
412 ns := metav1.NamespaceDefault
413 testServer, _ := makeTestServer(t, ns)
414 defer testServer.Close()
415
416 tCtx := ktesting.Init(t)
417 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
418 endpoints.endpointsStore.Add(&v1.Endpoints{
419 ObjectMeta: metav1.ObjectMeta{
420 Name: "foo",
421 Namespace: ns,
422 ResourceVersion: "1",
423 },
424 Subsets: []v1.EndpointSubset{{
425 Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
426 Ports: []v1.EndpointPort{{Port: 1000}},
427 }},
428 })
429 endpoints.checkLeftoverEndpoints()
430 if e, a := 1, endpoints.queue.Len(); e != a {
431 t.Fatalf("Expected %v, got %v", e, a)
432 }
433 got, _ := endpoints.queue.Get()
434 if e, a := ns+"/foo", got; e != a {
435 t.Errorf("Expected %v, got %v", e, a)
436 }
437 }
438
439 func TestSyncEndpointsProtocolTCP(t *testing.T) {
440 ns := "other"
441 testServer, endpointsHandler := makeTestServer(t, ns)
442 defer testServer.Close()
443 tCtx := ktesting.Init(t)
444 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
445 endpoints.endpointsStore.Add(&v1.Endpoints{
446 ObjectMeta: metav1.ObjectMeta{
447 Name: "foo",
448 Namespace: ns,
449 ResourceVersion: "1",
450 },
451 Subsets: []v1.EndpointSubset{{
452 Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
453 Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
454 }},
455 })
456 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
457 endpoints.serviceStore.Add(&v1.Service{
458 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
459 Spec: v1.ServiceSpec{
460 Selector: map[string]string{},
461 Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
462 },
463 })
464 err := endpoints.syncService(tCtx, ns+"/foo")
465 if err != nil {
466 t.Errorf("Unexpected error syncing service %v", err)
467 }
468
469 endpointsHandler.ValidateRequestCount(t, 1)
470 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
471 ObjectMeta: metav1.ObjectMeta{
472 Name: "foo",
473 Namespace: ns,
474 ResourceVersion: "1",
475 Labels: map[string]string{
476 v1.IsHeadlessService: "",
477 },
478 },
479 Subsets: []v1.EndpointSubset{{
480 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
481 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
482 }},
483 })
484 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
485 }
486
487 func TestSyncEndpointsHeadlessServiceLabel(t *testing.T) {
488 ns := metav1.NamespaceDefault
489 testServer, endpointsHandler := makeTestServer(t, ns)
490 defer testServer.Close()
491 tCtx := ktesting.Init(t)
492 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
493 endpoints.endpointsStore.Add(&v1.Endpoints{
494 ObjectMeta: metav1.ObjectMeta{
495 Name: "foo",
496 Namespace: ns,
497 ResourceVersion: "1",
498 Labels: map[string]string{
499 v1.IsHeadlessService: "",
500 },
501 },
502 Subsets: []v1.EndpointSubset{},
503 })
504 endpoints.serviceStore.Add(&v1.Service{
505 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
506 Spec: v1.ServiceSpec{
507 Selector: map[string]string{"foo": "bar"},
508 Ports: []v1.ServicePort{{Port: 80}},
509 },
510 })
511 err := endpoints.syncService(tCtx, ns+"/foo")
512 if err != nil {
513 t.Errorf("Unexpected error syncing service %v", err)
514 }
515
516 endpointsHandler.ValidateRequestCount(t, 0)
517 }
518
519 func TestSyncServiceExternalNameType(t *testing.T) {
520 serviceName := "testing-1"
521 namespace := metav1.NamespaceDefault
522
523 testCases := []struct {
524 desc string
525 service *v1.Service
526 }{
527 {
528 desc: "External name with selector and ports should not receive endpoints",
529 service: &v1.Service{
530 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
531 Spec: v1.ServiceSpec{
532 Selector: map[string]string{"foo": "bar"},
533 Ports: []v1.ServicePort{{Port: 80}},
534 Type: v1.ServiceTypeExternalName,
535 },
536 },
537 },
538 {
539 desc: "External name with ports should not receive endpoints",
540 service: &v1.Service{
541 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
542 Spec: v1.ServiceSpec{
543 Ports: []v1.ServicePort{{Port: 80}},
544 Type: v1.ServiceTypeExternalName,
545 },
546 },
547 },
548 {
549 desc: "External name with selector should not receive endpoints",
550 service: &v1.Service{
551 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
552 Spec: v1.ServiceSpec{
553 Selector: map[string]string{"foo": "bar"},
554 Type: v1.ServiceTypeExternalName,
555 },
556 },
557 },
558 {
559 desc: "External name without selector and ports should not receive endpoints",
560 service: &v1.Service{
561 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
562 Spec: v1.ServiceSpec{
563 Type: v1.ServiceTypeExternalName,
564 },
565 },
566 },
567 }
568
569 for _, tc := range testCases {
570 t.Run(tc.desc, func(t *testing.T) {
571 testServer, endpointsHandler := makeTestServer(t, namespace)
572
573 defer testServer.Close()
574 tCtx := ktesting.Init(t)
575 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
576 err := endpoints.serviceStore.Add(tc.service)
577 if err != nil {
578 t.Fatalf("Error adding service to service store: %v", err)
579 }
580 err = endpoints.syncService(tCtx, namespace+"/"+serviceName)
581 if err != nil {
582 t.Fatalf("Error syncing service: %v", err)
583 }
584 endpointsHandler.ValidateRequestCount(t, 0)
585 })
586 }
587 }
588
589 func TestSyncEndpointsProtocolUDP(t *testing.T) {
590 ns := "other"
591 testServer, endpointsHandler := makeTestServer(t, ns)
592 defer testServer.Close()
593 tCtx := ktesting.Init(t)
594 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
595 endpoints.endpointsStore.Add(&v1.Endpoints{
596 ObjectMeta: metav1.ObjectMeta{
597 Name: "foo",
598 Namespace: ns,
599 ResourceVersion: "1",
600 },
601 Subsets: []v1.EndpointSubset{{
602 Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
603 Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
604 }},
605 })
606 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
607 endpoints.serviceStore.Add(&v1.Service{
608 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
609 Spec: v1.ServiceSpec{
610 Selector: map[string]string{},
611 Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "UDP"}},
612 },
613 })
614 err := endpoints.syncService(tCtx, ns+"/foo")
615 if err != nil {
616 t.Errorf("Unexpected error syncing service %v", err)
617 }
618
619 endpointsHandler.ValidateRequestCount(t, 1)
620 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
621 ObjectMeta: metav1.ObjectMeta{
622 Name: "foo",
623 Namespace: ns,
624 ResourceVersion: "1",
625 Labels: map[string]string{
626 v1.IsHeadlessService: "",
627 },
628 },
629 Subsets: []v1.EndpointSubset{{
630 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
631 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "UDP"}},
632 }},
633 })
634 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
635 }
636
637 func TestSyncEndpointsProtocolSCTP(t *testing.T) {
638 ns := "other"
639 testServer, endpointsHandler := makeTestServer(t, ns)
640 defer testServer.Close()
641 tCtx := ktesting.Init(t)
642 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
643 endpoints.endpointsStore.Add(&v1.Endpoints{
644 ObjectMeta: metav1.ObjectMeta{
645 Name: "foo",
646 Namespace: ns,
647 ResourceVersion: "1",
648 },
649 Subsets: []v1.EndpointSubset{{
650 Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
651 Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
652 }},
653 })
654 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
655 endpoints.serviceStore.Add(&v1.Service{
656 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
657 Spec: v1.ServiceSpec{
658 Selector: map[string]string{},
659 Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "SCTP"}},
660 },
661 })
662 err := endpoints.syncService(tCtx, ns+"/foo")
663 if err != nil {
664 t.Errorf("Unexpected error syncing service %v", err)
665 }
666
667 endpointsHandler.ValidateRequestCount(t, 1)
668 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
669 ObjectMeta: metav1.ObjectMeta{
670 Name: "foo",
671 Namespace: ns,
672 ResourceVersion: "1",
673 Labels: map[string]string{
674 v1.IsHeadlessService: "",
675 },
676 },
677 Subsets: []v1.EndpointSubset{{
678 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
679 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "SCTP"}},
680 }},
681 })
682 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
683 }
684
685 func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
686 ns := "other"
687 testServer, endpointsHandler := makeTestServer(t, ns)
688 defer testServer.Close()
689 tCtx := ktesting.Init(t)
690 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
691 endpoints.endpointsStore.Add(&v1.Endpoints{
692 ObjectMeta: metav1.ObjectMeta{
693 Name: "foo",
694 Namespace: ns,
695 ResourceVersion: "1",
696 },
697 Subsets: []v1.EndpointSubset{},
698 })
699 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
700 endpoints.serviceStore.Add(&v1.Service{
701 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
702 Spec: v1.ServiceSpec{
703 Selector: map[string]string{},
704 Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
705 },
706 })
707 err := endpoints.syncService(tCtx, ns+"/foo")
708 if err != nil {
709 t.Errorf("Unexpected error syncing service %v", err)
710 }
711
712 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
713 ObjectMeta: metav1.ObjectMeta{
714 Name: "foo",
715 Namespace: ns,
716 ResourceVersion: "1",
717 Labels: map[string]string{
718 v1.IsHeadlessService: "",
719 },
720 },
721 Subsets: []v1.EndpointSubset{{
722 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
723 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
724 }},
725 })
726 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
727 }
728
729 func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
730 ns := "other"
731 testServer, endpointsHandler := makeTestServer(t, ns)
732 defer testServer.Close()
733
734 tCtx := ktesting.Init(t)
735 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
736 endpoints.endpointsStore.Add(&v1.Endpoints{
737 ObjectMeta: metav1.ObjectMeta{
738 Name: "foo",
739 Namespace: ns,
740 ResourceVersion: "1",
741 },
742 Subsets: []v1.EndpointSubset{},
743 })
744 addPods(endpoints.podStore, ns, 0, 1, 1, ipv4only)
745 endpoints.serviceStore.Add(&v1.Service{
746 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
747 Spec: v1.ServiceSpec{
748 Selector: map[string]string{},
749 Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
750 },
751 })
752 err := endpoints.syncService(tCtx, ns+"/foo")
753 if err != nil {
754 t.Errorf("Unexpected error syncing service %v", err)
755 }
756
757 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
758 ObjectMeta: metav1.ObjectMeta{
759 Name: "foo",
760 Namespace: ns,
761 ResourceVersion: "1",
762 Labels: map[string]string{
763 v1.IsHeadlessService: "",
764 },
765 },
766 Subsets: []v1.EndpointSubset{{
767 NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
768 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
769 }},
770 })
771 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
772 }
773
774 func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
775 ns := "other"
776 testServer, endpointsHandler := makeTestServer(t, ns)
777 defer testServer.Close()
778
779 tCtx := ktesting.Init(t)
780 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
781 endpoints.endpointsStore.Add(&v1.Endpoints{
782 ObjectMeta: metav1.ObjectMeta{
783 Name: "foo",
784 Namespace: ns,
785 ResourceVersion: "1",
786 },
787 Subsets: []v1.EndpointSubset{},
788 })
789 addPods(endpoints.podStore, ns, 1, 1, 1, ipv4only)
790 endpoints.serviceStore.Add(&v1.Service{
791 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
792 Spec: v1.ServiceSpec{
793 Selector: map[string]string{},
794 Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
795 },
796 })
797 err := endpoints.syncService(tCtx, ns+"/foo")
798 if err != nil {
799 t.Errorf("Unexpected error syncing service %v", err)
800 }
801
802 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
803 ObjectMeta: metav1.ObjectMeta{
804 Name: "foo",
805 Namespace: ns,
806 ResourceVersion: "1",
807 Labels: map[string]string{
808 v1.IsHeadlessService: "",
809 },
810 },
811 Subsets: []v1.EndpointSubset{{
812 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
813 NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
814 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
815 }},
816 })
817 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
818 }
819
820 func TestSyncEndpointsItemsPreexisting(t *testing.T) {
821 ns := "bar"
822 testServer, endpointsHandler := makeTestServer(t, ns)
823 defer testServer.Close()
824 tCtx := ktesting.Init(t)
825 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
826 endpoints.endpointsStore.Add(&v1.Endpoints{
827 ObjectMeta: metav1.ObjectMeta{
828 Name: "foo",
829 Namespace: ns,
830 ResourceVersion: "1",
831 },
832 Subsets: []v1.EndpointSubset{{
833 Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
834 Ports: []v1.EndpointPort{{Port: 1000}},
835 }},
836 })
837 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
838 endpoints.serviceStore.Add(&v1.Service{
839 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
840 Spec: v1.ServiceSpec{
841 Selector: map[string]string{"foo": "bar"},
842 Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
843 },
844 })
845 err := endpoints.syncService(tCtx, ns+"/foo")
846 if err != nil {
847 t.Errorf("Unexpected error syncing service %v", err)
848 }
849
850 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
851 ObjectMeta: metav1.ObjectMeta{
852 Name: "foo",
853 Namespace: ns,
854 ResourceVersion: "1",
855 Labels: map[string]string{
856 v1.IsHeadlessService: "",
857 },
858 },
859 Subsets: []v1.EndpointSubset{{
860 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
861 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
862 }},
863 })
864 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
865 }
866
867 func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
868 ns := metav1.NamespaceDefault
869 testServer, endpointsHandler := makeTestServer(t, ns)
870 defer testServer.Close()
871 tCtx := ktesting.Init(t)
872 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
873 endpoints.endpointsStore.Add(&v1.Endpoints{
874 ObjectMeta: metav1.ObjectMeta{
875 ResourceVersion: "1",
876 Name: "foo",
877 Namespace: ns,
878 },
879 Subsets: []v1.EndpointSubset{{
880 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
881 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
882 }},
883 })
884 addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, ipv4only)
885 endpoints.serviceStore.Add(&v1.Service{
886 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
887 Spec: v1.ServiceSpec{
888 Selector: map[string]string{"foo": "bar"},
889 Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
890 },
891 })
892 err := endpoints.syncService(tCtx, ns+"/foo")
893 if err != nil {
894 t.Errorf("Unexpected error syncing service %v", err)
895 }
896 endpointsHandler.ValidateRequestCount(t, 0)
897 }
898
899 func TestSyncEndpointsItems(t *testing.T) {
900 ns := "other"
901 testServer, endpointsHandler := makeTestServer(t, ns)
902 defer testServer.Close()
903 tCtx := ktesting.Init(t)
904 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
905 addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only)
906 addPods(endpoints.podStore, "blah", 5, 2, 0, ipv4only)
907
908 endpoints.serviceStore.Add(&v1.Service{
909 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
910 Spec: v1.ServiceSpec{
911 Selector: map[string]string{"foo": "bar"},
912 Ports: []v1.ServicePort{
913 {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
914 {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt32(8088)},
915 },
916 },
917 })
918 err := endpoints.syncService(tCtx, "other/foo")
919 if err != nil {
920 t.Errorf("Unexpected error syncing service %v", err)
921 }
922
923 expectedSubsets := []v1.EndpointSubset{{
924 Addresses: []v1.EndpointAddress{
925 {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
926 {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
927 {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
928 },
929 Ports: []v1.EndpointPort{
930 {Name: "port0", Port: 8080, Protocol: "TCP"},
931 {Name: "port1", Port: 8088, Protocol: "TCP"},
932 },
933 }}
934 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
935 ObjectMeta: metav1.ObjectMeta{
936 ResourceVersion: "",
937 Name: "foo",
938 Labels: map[string]string{
939 v1.IsHeadlessService: "",
940 },
941 },
942 Subsets: endptspkg.SortSubsets(expectedSubsets),
943 })
944 endpointsHandler.ValidateRequestCount(t, 1)
945 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
946 }
947
948 func TestSyncEndpointsItemsWithLabels(t *testing.T) {
949 ns := "other"
950 testServer, endpointsHandler := makeTestServer(t, ns)
951 defer testServer.Close()
952 tCtx := ktesting.Init(t)
953 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
954 addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only)
955 serviceLabels := map[string]string{"foo": "bar"}
956 endpoints.serviceStore.Add(&v1.Service{
957 ObjectMeta: metav1.ObjectMeta{
958 Name: "foo",
959 Namespace: ns,
960 Labels: serviceLabels,
961 },
962 Spec: v1.ServiceSpec{
963 Selector: map[string]string{"foo": "bar"},
964 Ports: []v1.ServicePort{
965 {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
966 {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt32(8088)},
967 },
968 },
969 })
970 err := endpoints.syncService(tCtx, ns+"/foo")
971 if err != nil {
972 t.Errorf("Unexpected error syncing service %v", err)
973 }
974
975 expectedSubsets := []v1.EndpointSubset{{
976 Addresses: []v1.EndpointAddress{
977 {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
978 {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
979 {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
980 },
981 Ports: []v1.EndpointPort{
982 {Name: "port0", Port: 8080, Protocol: "TCP"},
983 {Name: "port1", Port: 8088, Protocol: "TCP"},
984 },
985 }}
986
987 serviceLabels[v1.IsHeadlessService] = ""
988 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
989 ObjectMeta: metav1.ObjectMeta{
990 ResourceVersion: "",
991 Name: "foo",
992 Labels: serviceLabels,
993 },
994 Subsets: endptspkg.SortSubsets(expectedSubsets),
995 })
996 endpointsHandler.ValidateRequestCount(t, 1)
997 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
998 }
999
1000 func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
1001 ns := "bar"
1002 testServer, endpointsHandler := makeTestServer(t, ns)
1003 defer testServer.Close()
1004 tCtx := ktesting.Init(t)
1005 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
1006 endpoints.endpointsStore.Add(&v1.Endpoints{
1007 ObjectMeta: metav1.ObjectMeta{
1008 Name: "foo",
1009 Namespace: ns,
1010 ResourceVersion: "1",
1011 Labels: map[string]string{
1012 "foo": "bar",
1013 },
1014 },
1015 Subsets: []v1.EndpointSubset{{
1016 Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
1017 Ports: []v1.EndpointPort{{Port: 1000}},
1018 }},
1019 })
1020 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
1021 serviceLabels := map[string]string{"baz": "blah"}
1022 endpoints.serviceStore.Add(&v1.Service{
1023 ObjectMeta: metav1.ObjectMeta{
1024 Name: "foo",
1025 Namespace: ns,
1026 Labels: serviceLabels,
1027 },
1028 Spec: v1.ServiceSpec{
1029 Selector: map[string]string{"foo": "bar"},
1030 Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
1031 },
1032 })
1033 err := endpoints.syncService(tCtx, ns+"/foo")
1034 if err != nil {
1035 t.Errorf("Unexpected error syncing service %v", err)
1036 }
1037
1038 serviceLabels[v1.IsHeadlessService] = ""
1039 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
1040 ObjectMeta: metav1.ObjectMeta{
1041 Name: "foo",
1042 Namespace: ns,
1043 ResourceVersion: "1",
1044 Labels: serviceLabels,
1045 },
1046 Subsets: []v1.EndpointSubset{{
1047 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
1048 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
1049 }},
1050 })
1051 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
1052 }
1053
1054 func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
1055 var tests = []struct {
1056 podsSynced func() bool
1057 servicesSynced func() bool
1058 endpointsSynced func() bool
1059 shouldUpdateEndpoints bool
1060 }{
1061 {neverReady, alwaysReady, alwaysReady, false},
1062 {alwaysReady, neverReady, alwaysReady, false},
1063 {alwaysReady, alwaysReady, neverReady, false},
1064 {alwaysReady, alwaysReady, alwaysReady, true},
1065 }
1066
1067 for _, test := range tests {
1068 func() {
1069 ns := "other"
1070 testServer, endpointsHandler := makeTestServer(t, ns)
1071 defer testServer.Close()
1072 tCtx := ktesting.Init(t)
1073 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
1074 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
1075
1076 service := &v1.Service{
1077 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
1078 Spec: v1.ServiceSpec{
1079 Selector: map[string]string{},
1080 Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
1081 },
1082 }
1083 endpoints.serviceStore.Add(service)
1084 endpoints.onServiceUpdate(service)
1085 endpoints.podsSynced = test.podsSynced
1086 endpoints.servicesSynced = test.servicesSynced
1087 endpoints.endpointsSynced = test.endpointsSynced
1088 endpoints.workerLoopPeriod = 10 * time.Millisecond
1089 go endpoints.Run(tCtx, 1)
1090
1091
1092
1093
1094 time.Sleep(150 * time.Millisecond)
1095 if test.shouldUpdateEndpoints {
1096
1097 wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
1098 return endpoints.queue.Len() == 0, nil
1099 })
1100 endpointsHandler.ValidateRequestCount(t, 1)
1101 } else {
1102 endpointsHandler.ValidateRequestCount(t, 0)
1103 }
1104 }()
1105 }
1106 }
1107
1108 func TestSyncEndpointsHeadlessService(t *testing.T) {
1109 ns := "headless"
1110 testServer, endpointsHandler := makeTestServer(t, ns)
1111 defer testServer.Close()
1112 tCtx := ktesting.Init(t)
1113 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
1114 endpoints.endpointsStore.Add(&v1.Endpoints{
1115 ObjectMeta: metav1.ObjectMeta{
1116 Name: "foo",
1117 Namespace: ns,
1118 ResourceVersion: "1",
1119 },
1120 Subsets: []v1.EndpointSubset{{
1121 Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
1122 Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
1123 }},
1124 })
1125 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
1126 service := &v1.Service{
1127 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, Labels: map[string]string{"a": "b"}},
1128 Spec: v1.ServiceSpec{
1129 Selector: map[string]string{},
1130 ClusterIP: api.ClusterIPNone,
1131 Ports: []v1.ServicePort{},
1132 },
1133 }
1134 originalService := service.DeepCopy()
1135 endpoints.serviceStore.Add(service)
1136 err := endpoints.syncService(tCtx, ns+"/foo")
1137 if err != nil {
1138 t.Errorf("Unexpected error syncing service %v", err)
1139 }
1140 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
1141 ObjectMeta: metav1.ObjectMeta{
1142 Name: "foo",
1143 Namespace: ns,
1144 ResourceVersion: "1",
1145 Labels: map[string]string{
1146 "a": "b",
1147 v1.IsHeadlessService: "",
1148 },
1149 },
1150 Subsets: []v1.EndpointSubset{{
1151 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
1152 Ports: []v1.EndpointPort{},
1153 }},
1154 })
1155 if !reflect.DeepEqual(originalService, service) {
1156 t.Fatalf("syncing endpoints changed service: %s", cmp.Diff(service, originalService))
1157 }
1158 endpointsHandler.ValidateRequestCount(t, 1)
1159 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
1160 }
1161
1162 func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) {
1163 ns := "other"
1164 testServer, endpointsHandler := makeTestServer(t, ns)
1165 defer testServer.Close()
1166
1167 tCtx := ktesting.Init(t)
1168 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
1169 endpoints.endpointsStore.Add(&v1.Endpoints{
1170 ObjectMeta: metav1.ObjectMeta{
1171 Name: "foo",
1172 Namespace: ns,
1173 ResourceVersion: "1",
1174 Labels: map[string]string{
1175 "foo": "bar",
1176 },
1177 },
1178 Subsets: []v1.EndpointSubset{},
1179 })
1180 addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed)
1181 endpoints.serviceStore.Add(&v1.Service{
1182 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
1183 Spec: v1.ServiceSpec{
1184 Selector: map[string]string{"foo": "bar"},
1185 Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
1186 },
1187 })
1188 err := endpoints.syncService(tCtx, ns+"/foo")
1189 if err != nil {
1190 t.Errorf("Unexpected error syncing service %v", err)
1191 }
1192 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
1193 ObjectMeta: metav1.ObjectMeta{
1194 Name: "foo",
1195 Namespace: ns,
1196 ResourceVersion: "1",
1197 Labels: map[string]string{
1198 v1.IsHeadlessService: "",
1199 },
1200 },
1201 Subsets: []v1.EndpointSubset{},
1202 })
1203 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
1204 }
1205
1206 func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) {
1207 ns := "other"
1208 testServer, endpointsHandler := makeTestServer(t, ns)
1209 defer testServer.Close()
1210
1211 tCtx := ktesting.Init(t)
1212 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
1213 endpoints.endpointsStore.Add(&v1.Endpoints{
1214 ObjectMeta: metav1.ObjectMeta{
1215 Name: "foo",
1216 Namespace: ns,
1217 ResourceVersion: "1",
1218 Labels: map[string]string{
1219 "foo": "bar",
1220 },
1221 },
1222 Subsets: []v1.EndpointSubset{},
1223 })
1224 addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded)
1225 endpoints.serviceStore.Add(&v1.Service{
1226 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
1227 Spec: v1.ServiceSpec{
1228 Selector: map[string]string{"foo": "bar"},
1229 Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
1230 },
1231 })
1232 err := endpoints.syncService(tCtx, ns+"/foo")
1233 if err != nil {
1234 t.Errorf("Unexpected error syncing service %v", err)
1235 }
1236 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
1237 ObjectMeta: metav1.ObjectMeta{
1238 Name: "foo",
1239 Namespace: ns,
1240 ResourceVersion: "1",
1241 Labels: map[string]string{
1242 v1.IsHeadlessService: "",
1243 },
1244 },
1245 Subsets: []v1.EndpointSubset{},
1246 })
1247 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
1248 }
1249
1250 func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) {
1251 ns := "other"
1252 testServer, endpointsHandler := makeTestServer(t, ns)
1253 defer testServer.Close()
1254
1255 tCtx := ktesting.Init(t)
1256 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
1257 endpoints.endpointsStore.Add(&v1.Endpoints{
1258 ObjectMeta: metav1.ObjectMeta{
1259 Name: "foo",
1260 Namespace: ns,
1261 ResourceVersion: "1",
1262 Labels: map[string]string{
1263 "foo": "bar",
1264 },
1265 },
1266 Subsets: []v1.EndpointSubset{},
1267 })
1268 addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded)
1269 endpoints.serviceStore.Add(&v1.Service{
1270 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
1271 Spec: v1.ServiceSpec{
1272 Selector: map[string]string{"foo": "bar"},
1273 Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
1274 },
1275 })
1276 err := endpoints.syncService(tCtx, ns+"/foo")
1277 if err != nil {
1278 t.Errorf("Unexpected error syncing service %v", err)
1279 }
1280
1281 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
1282 ObjectMeta: metav1.ObjectMeta{
1283 Name: "foo",
1284 Namespace: ns,
1285 ResourceVersion: "1",
1286 Labels: map[string]string{
1287 v1.IsHeadlessService: "",
1288 },
1289 },
1290 Subsets: []v1.EndpointSubset{},
1291 })
1292 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
1293 }
1294
1295 func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
1296 ns := metav1.NamespaceDefault
1297 testServer, endpointsHandler := makeTestServer(t, ns)
1298 defer testServer.Close()
1299 tCtx := ktesting.Init(t)
1300 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
1301 endpoints.serviceStore.Add(&v1.Service{
1302 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
1303 Spec: v1.ServiceSpec{
1304 Selector: map[string]string{"foo": "bar"},
1305 ClusterIP: "None",
1306 Ports: nil,
1307 },
1308 })
1309 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
1310
1311 err := endpoints.syncService(tCtx, ns+"/foo")
1312 if err != nil {
1313 t.Errorf("Unexpected error syncing service %v", err)
1314 }
1315 endpointsHandler.ValidateRequestCount(t, 1)
1316 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
1317 ObjectMeta: metav1.ObjectMeta{
1318 Name: "foo",
1319 Labels: map[string]string{
1320 v1.IsHeadlessService: "",
1321 },
1322 },
1323 Subsets: []v1.EndpointSubset{{
1324 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
1325 Ports: nil,
1326 }},
1327 })
1328 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
1329 }
1330
1331 func TestPodToEndpointAddressForService(t *testing.T) {
1332 ipv4 := v1.IPv4Protocol
1333 ipv6 := v1.IPv6Protocol
1334
1335 testCases := []struct {
1336 name string
1337 ipFamilies []v1.IPFamily
1338 service v1.Service
1339 expectedEndpointFamily v1.IPFamily
1340 expectError bool
1341 }{
1342 {
1343 name: "v4 service, in a single stack cluster",
1344 ipFamilies: ipv4only,
1345 service: v1.Service{
1346 Spec: v1.ServiceSpec{
1347 ClusterIP: "10.0.0.1",
1348 },
1349 },
1350 expectedEndpointFamily: ipv4,
1351 },
1352 {
1353 name: "v4 service, in a dual stack cluster",
1354 ipFamilies: ipv4ipv6,
1355 service: v1.Service{
1356 Spec: v1.ServiceSpec{
1357 ClusterIP: "10.0.0.1",
1358 },
1359 },
1360 expectedEndpointFamily: ipv4,
1361 },
1362 {
1363 name: "v4 service, in a dual stack ipv6-primary cluster",
1364 ipFamilies: ipv6ipv4,
1365 service: v1.Service{
1366 Spec: v1.ServiceSpec{
1367 ClusterIP: "10.0.0.1",
1368 },
1369 },
1370 expectedEndpointFamily: ipv4,
1371 },
1372 {
1373 name: "v4 headless service, in a single stack cluster",
1374 ipFamilies: ipv4only,
1375 service: v1.Service{
1376 Spec: v1.ServiceSpec{
1377 ClusterIP: v1.ClusterIPNone,
1378 },
1379 },
1380 expectedEndpointFamily: ipv4,
1381 },
1382 {
1383 name: "v4 headless service, in a dual stack cluster",
1384 ipFamilies: ipv4ipv6,
1385 service: v1.Service{
1386 Spec: v1.ServiceSpec{
1387 ClusterIP: v1.ClusterIPNone,
1388 IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
1389 },
1390 },
1391 expectedEndpointFamily: ipv4,
1392 },
1393 {
1394 name: "v4 legacy headless service, in a dual stack cluster",
1395 ipFamilies: ipv4ipv6,
1396 service: v1.Service{
1397 Spec: v1.ServiceSpec{
1398 ClusterIP: v1.ClusterIPNone,
1399 },
1400 },
1401 expectedEndpointFamily: ipv4,
1402 },
1403 {
1404 name: "v4 legacy headless service, in a dual stack ipv6-primary cluster",
1405 ipFamilies: ipv6ipv4,
1406 service: v1.Service{
1407 Spec: v1.ServiceSpec{
1408 ClusterIP: v1.ClusterIPNone,
1409 },
1410 },
1411 expectedEndpointFamily: ipv6,
1412 },
1413 {
1414 name: "v6 service, in a dual stack cluster",
1415 ipFamilies: ipv4ipv6,
1416 service: v1.Service{
1417 Spec: v1.ServiceSpec{
1418 ClusterIP: "3000::1",
1419 },
1420 },
1421 expectedEndpointFamily: ipv6,
1422 },
1423 {
1424 name: "v6 headless service, in a single stack cluster",
1425 ipFamilies: ipv6only,
1426 service: v1.Service{
1427 Spec: v1.ServiceSpec{
1428 ClusterIP: v1.ClusterIPNone,
1429 },
1430 },
1431 expectedEndpointFamily: ipv6,
1432 },
1433 {
1434 name: "v6 headless service, in a dual stack cluster (connected to a new api-server)",
1435 ipFamilies: ipv4ipv6,
1436 service: v1.Service{
1437 Spec: v1.ServiceSpec{
1438 ClusterIP: v1.ClusterIPNone,
1439 IPFamilies: []v1.IPFamily{v1.IPv6Protocol},
1440 },
1441 },
1442 expectedEndpointFamily: ipv6,
1443 },
1444 {
1445 name: "v6 legacy headless service, in a dual stack cluster (connected to a old api-server)",
1446 ipFamilies: ipv4ipv6,
1447 service: v1.Service{
1448 Spec: v1.ServiceSpec{
1449 ClusterIP: v1.ClusterIPNone,
1450 },
1451 },
1452 expectedEndpointFamily: ipv4,
1453 },
1454
1455
1456
1457
1458
1459 {
1460 name: "v6 service, in a v4 only cluster.",
1461 ipFamilies: ipv4only,
1462 service: v1.Service{
1463 Spec: v1.ServiceSpec{
1464 ClusterIP: "3000::1",
1465 },
1466 },
1467 expectError: true,
1468 expectedEndpointFamily: ipv4,
1469 },
1470
1471 {
1472 name: "v6 service, in a v4 only cluster",
1473 ipFamilies: ipv4only,
1474 service: v1.Service{
1475 Spec: v1.ServiceSpec{
1476 ClusterIP: "3000::1",
1477 },
1478 },
1479 expectError: true,
1480 },
1481 }
1482 for _, tc := range testCases {
1483 t.Run(tc.name, func(t *testing.T) {
1484 podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
1485 ns := "test"
1486 addPods(podStore, ns, 1, 1, 0, tc.ipFamilies)
1487 pods := podStore.List()
1488 if len(pods) != 1 {
1489 t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods))
1490 }
1491 pod := pods[0].(*v1.Pod)
1492 epa, err := podToEndpointAddressForService(&tc.service, pod)
1493
1494 if err != nil && !tc.expectError {
1495 t.Fatalf("podToEndpointAddressForService returned unexpected error %v", err)
1496 }
1497
1498 if err == nil && tc.expectError {
1499 t.Fatalf("podToEndpointAddressForService should have returned error but it did not")
1500 }
1501
1502 if err != nil && tc.expectError {
1503 return
1504 }
1505
1506 if utilnet.IsIPv6String(epa.IP) != (tc.expectedEndpointFamily == ipv6) {
1507 t.Fatalf("IP: expected %s, got: %s", tc.expectedEndpointFamily, epa.IP)
1508 }
1509 if *(epa.NodeName) != pod.Spec.NodeName {
1510 t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
1511 }
1512 if epa.TargetRef.Kind != "Pod" {
1513 t.Fatalf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
1514 }
1515 if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
1516 t.Fatalf("TargetRef.Namespace: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
1517 }
1518 if epa.TargetRef.Name != pod.ObjectMeta.Name {
1519 t.Fatalf("TargetRef.Name: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
1520 }
1521 if epa.TargetRef.UID != pod.ObjectMeta.UID {
1522 t.Fatalf("TargetRef.UID: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
1523 }
1524 if epa.TargetRef.ResourceVersion != "" {
1525 t.Fatalf("TargetRef.ResourceVersion: expected empty, got: %s", epa.TargetRef.ResourceVersion)
1526 }
1527 })
1528 }
1529
1530 }
1531
1532 func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
1533 ns := "other"
1534 testServer, endpointsHandler := makeTestServer(t, ns)
1535 defer testServer.Close()
1536
1537 tCtx := ktesting.Init(t)
1538 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
1539 endpoints.endpointsStore.Add(&v1.Endpoints{
1540 ObjectMeta: metav1.ObjectMeta{
1541 Name: "foo",
1542 Namespace: ns,
1543 ResourceVersion: "1",
1544 },
1545 Subsets: []v1.EndpointSubset{{
1546 Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
1547 Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
1548 }},
1549 })
1550 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
1551 endpoints.serviceStore.Add(&v1.Service{
1552 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
1553 Spec: v1.ServiceSpec{
1554 Selector: map[string]string{},
1555 Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
1556 },
1557 })
1558 err := endpoints.syncService(tCtx, ns+"/foo")
1559 if err != nil {
1560 t.Errorf("Unexpected error syncing service %v", err)
1561 }
1562
1563 endpointsHandler.ValidateRequestCount(t, 1)
1564 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
1565 ObjectMeta: metav1.ObjectMeta{
1566 Name: "foo",
1567 Namespace: ns,
1568 ResourceVersion: "1",
1569 Annotations: map[string]string{
1570 v1.EndpointsLastChangeTriggerTime: triggerTimeString,
1571 },
1572 Labels: map[string]string{
1573 v1.IsHeadlessService: "",
1574 },
1575 },
1576 Subsets: []v1.EndpointSubset{{
1577 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
1578 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
1579 }},
1580 })
1581 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
1582 }
1583
1584 func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
1585 ns := "other"
1586 testServer, endpointsHandler := makeTestServer(t, ns)
1587 defer testServer.Close()
1588
1589 tCtx := ktesting.Init(t)
1590 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
1591 endpoints.endpointsStore.Add(&v1.Endpoints{
1592 ObjectMeta: metav1.ObjectMeta{
1593 Name: "foo",
1594 Namespace: ns,
1595 ResourceVersion: "1",
1596 Annotations: map[string]string{
1597 v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
1598 },
1599 },
1600 Subsets: []v1.EndpointSubset{{
1601 Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
1602 Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
1603 }},
1604 })
1605 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
1606 endpoints.serviceStore.Add(&v1.Service{
1607 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
1608 Spec: v1.ServiceSpec{
1609 Selector: map[string]string{},
1610 Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
1611 },
1612 })
1613 err := endpoints.syncService(tCtx, ns+"/foo")
1614 if err != nil {
1615 t.Errorf("Unexpected error syncing service %v", err)
1616 }
1617
1618 endpointsHandler.ValidateRequestCount(t, 1)
1619 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
1620 ObjectMeta: metav1.ObjectMeta{
1621 Name: "foo",
1622 Namespace: ns,
1623 ResourceVersion: "1",
1624 Annotations: map[string]string{
1625 v1.EndpointsLastChangeTriggerTime: triggerTimeString,
1626 },
1627 Labels: map[string]string{
1628 v1.IsHeadlessService: "",
1629 },
1630 },
1631 Subsets: []v1.EndpointSubset{{
1632 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
1633 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
1634 }},
1635 })
1636 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
1637 }
1638
1639 func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
1640 ns := "other"
1641 testServer, endpointsHandler := makeTestServer(t, ns)
1642 defer testServer.Close()
1643
1644 tCtx := ktesting.Init(t)
1645 endpoints := newController(tCtx, testServer.URL, 0*time.Second)
1646 endpoints.endpointsStore.Add(&v1.Endpoints{
1647 ObjectMeta: metav1.ObjectMeta{
1648 Name: "foo",
1649 Namespace: ns,
1650 ResourceVersion: "1",
1651 Annotations: map[string]string{
1652 v1.EndpointsLastChangeTriggerTime: triggerTimeString,
1653 },
1654 },
1655 Subsets: []v1.EndpointSubset{{
1656 Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
1657 Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
1658 }},
1659 })
1660
1661 addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
1662 endpoints.serviceStore.Add(&v1.Service{
1663 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
1664 Spec: v1.ServiceSpec{
1665 Selector: map[string]string{},
1666 Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
1667 },
1668 })
1669 err := endpoints.syncService(tCtx, ns+"/foo")
1670 if err != nil {
1671 t.Errorf("Unexpected error syncing service %v", err)
1672 }
1673
1674 endpointsHandler.ValidateRequestCount(t, 1)
1675 data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
1676 ObjectMeta: metav1.ObjectMeta{
1677 Name: "foo",
1678 Namespace: ns,
1679 ResourceVersion: "1",
1680 Labels: map[string]string{
1681 v1.IsHeadlessService: "",
1682 },
1683 },
1684 Subsets: []v1.EndpointSubset{{
1685 Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
1686 Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
1687 }},
1688 })
1689 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
1690 }
1691
1692
1693
1694
1695 func TestPodUpdatesBatching(t *testing.T) {
1696 type podUpdate struct {
1697 delay time.Duration
1698 podName string
1699 podIP string
1700 }
1701
1702 tests := []struct {
1703 name string
1704 batchPeriod time.Duration
1705 podsCount int
1706 updates []podUpdate
1707 finalDelay time.Duration
1708 wantRequestCount int
1709 }{
1710 {
1711 name: "three updates with no batching",
1712 batchPeriod: 0 * time.Second,
1713 podsCount: 10,
1714 updates: []podUpdate{
1715 {
1716
1717 delay: 200 * time.Millisecond,
1718 podName: "pod0",
1719 podIP: "10.0.0.0",
1720 },
1721 {
1722 delay: 100 * time.Millisecond,
1723 podName: "pod1",
1724 podIP: "10.0.0.1",
1725 },
1726 {
1727 delay: 100 * time.Millisecond,
1728 podName: "pod2",
1729 podIP: "10.0.0.2",
1730 },
1731 },
1732 finalDelay: 3 * time.Second,
1733 wantRequestCount: 3,
1734 },
1735 {
1736 name: "three updates in one batch",
1737 batchPeriod: 1 * time.Second,
1738 podsCount: 10,
1739 updates: []podUpdate{
1740 {
1741
1742 delay: 200 * time.Millisecond,
1743 podName: "pod0",
1744 podIP: "10.0.0.0",
1745 },
1746 {
1747 delay: 100 * time.Millisecond,
1748 podName: "pod1",
1749 podIP: "10.0.0.1",
1750 },
1751 {
1752 delay: 100 * time.Millisecond,
1753 podName: "pod2",
1754 podIP: "10.0.0.2",
1755 },
1756 },
1757 finalDelay: 3 * time.Second,
1758 wantRequestCount: 1,
1759 },
1760 {
1761 name: "three updates in two batches",
1762 batchPeriod: 1 * time.Second,
1763 podsCount: 10,
1764 updates: []podUpdate{
1765 {
1766
1767 delay: 200 * time.Millisecond,
1768 podName: "pod0",
1769 podIP: "10.0.0.0",
1770 },
1771 {
1772 delay: 100 * time.Millisecond,
1773 podName: "pod1",
1774 podIP: "10.0.0.1",
1775 },
1776 {
1777 delay: 1 * time.Second,
1778 podName: "pod2",
1779 podIP: "10.0.0.2",
1780 },
1781 },
1782 finalDelay: 3 * time.Second,
1783 wantRequestCount: 2,
1784 },
1785 }
1786
1787 for _, tc := range tests {
1788 t.Run(tc.name, func(t *testing.T) {
1789 ns := "other"
1790 resourceVersion := 1
1791 testServer, endpointsHandler := makeTestServer(t, ns)
1792 defer testServer.Close()
1793
1794 tCtx := ktesting.Init(t)
1795 endpoints := newController(tCtx, testServer.URL, tc.batchPeriod)
1796 endpoints.podsSynced = alwaysReady
1797 endpoints.servicesSynced = alwaysReady
1798 endpoints.endpointsSynced = alwaysReady
1799 endpoints.workerLoopPeriod = 10 * time.Millisecond
1800
1801 go endpoints.Run(tCtx, 1)
1802
1803 addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only)
1804
1805 endpoints.serviceStore.Add(&v1.Service{
1806 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
1807 Spec: v1.ServiceSpec{
1808 Selector: map[string]string{"foo": "bar"},
1809 Ports: []v1.ServicePort{{Port: 80}},
1810 },
1811 })
1812
1813 for _, update := range tc.updates {
1814 time.Sleep(update.delay)
1815
1816 old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
1817 if err != nil {
1818 t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
1819 }
1820 if !exists {
1821 t.Fatalf("Pod %q doesn't exist", update.podName)
1822 }
1823 oldPod := old.(*v1.Pod)
1824 newPod := oldPod.DeepCopy()
1825 newPod.Status.PodIP = update.podIP
1826 newPod.Status.PodIPs[0].IP = update.podIP
1827 newPod.ResourceVersion = strconv.Itoa(resourceVersion)
1828 resourceVersion++
1829
1830 endpoints.podStore.Update(newPod)
1831 endpoints.updatePod(oldPod, newPod)
1832 }
1833
1834 time.Sleep(tc.finalDelay)
1835 endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
1836 })
1837 }
1838 }
1839
1840
1841
1842
1843 func TestPodAddsBatching(t *testing.T) {
1844 type podAdd struct {
1845 delay time.Duration
1846 }
1847
1848 tests := []struct {
1849 name string
1850 batchPeriod time.Duration
1851 adds []podAdd
1852 finalDelay time.Duration
1853 wantRequestCount int
1854 }{
1855 {
1856 name: "three adds with no batching",
1857 batchPeriod: 0 * time.Second,
1858 adds: []podAdd{
1859 {
1860
1861 delay: 200 * time.Millisecond,
1862 },
1863 {
1864 delay: 100 * time.Millisecond,
1865 },
1866 {
1867 delay: 100 * time.Millisecond,
1868 },
1869 },
1870 finalDelay: 3 * time.Second,
1871 wantRequestCount: 3,
1872 },
1873 {
1874 name: "three adds in one batch",
1875 batchPeriod: 1 * time.Second,
1876 adds: []podAdd{
1877 {
1878
1879 delay: 200 * time.Millisecond,
1880 },
1881 {
1882 delay: 100 * time.Millisecond,
1883 },
1884 {
1885 delay: 100 * time.Millisecond,
1886 },
1887 },
1888 finalDelay: 3 * time.Second,
1889 wantRequestCount: 1,
1890 },
1891 {
1892 name: "three adds in two batches",
1893 batchPeriod: 1 * time.Second,
1894 adds: []podAdd{
1895 {
1896
1897 delay: 200 * time.Millisecond,
1898 },
1899 {
1900 delay: 100 * time.Millisecond,
1901 },
1902 {
1903 delay: 1 * time.Second,
1904 },
1905 },
1906 finalDelay: 3 * time.Second,
1907 wantRequestCount: 2,
1908 },
1909 }
1910
1911 for _, tc := range tests {
1912 t.Run(tc.name, func(t *testing.T) {
1913 ns := "other"
1914 testServer, endpointsHandler := makeTestServer(t, ns)
1915 defer testServer.Close()
1916
1917 tCtx := ktesting.Init(t)
1918 endpoints := newController(tCtx, testServer.URL, tc.batchPeriod)
1919 endpoints.podsSynced = alwaysReady
1920 endpoints.servicesSynced = alwaysReady
1921 endpoints.endpointsSynced = alwaysReady
1922 endpoints.workerLoopPeriod = 10 * time.Millisecond
1923
1924 go endpoints.Run(tCtx, 1)
1925
1926 endpoints.serviceStore.Add(&v1.Service{
1927 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
1928 Spec: v1.ServiceSpec{
1929 Selector: map[string]string{"foo": "bar"},
1930 Ports: []v1.ServicePort{{Port: 80}},
1931 },
1932 })
1933
1934 for i, add := range tc.adds {
1935 time.Sleep(add.delay)
1936
1937 p := testPod(ns, i, 1, true, ipv4only)
1938 endpoints.podStore.Add(p)
1939 endpoints.addPod(p)
1940 }
1941
1942 time.Sleep(tc.finalDelay)
1943 endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
1944 })
1945 }
1946 }
1947
1948
1949
1950
1951 func TestPodDeleteBatching(t *testing.T) {
1952 type podDelete struct {
1953 delay time.Duration
1954 podName string
1955 }
1956
1957 tests := []struct {
1958 name string
1959 batchPeriod time.Duration
1960 podsCount int
1961 deletes []podDelete
1962 finalDelay time.Duration
1963 wantRequestCount int
1964 }{
1965 {
1966 name: "three deletes with no batching",
1967 batchPeriod: 0 * time.Second,
1968 podsCount: 10,
1969 deletes: []podDelete{
1970 {
1971
1972 delay: 200 * time.Millisecond,
1973 podName: "pod0",
1974 },
1975 {
1976 delay: 100 * time.Millisecond,
1977 podName: "pod1",
1978 },
1979 {
1980 delay: 100 * time.Millisecond,
1981 podName: "pod2",
1982 },
1983 },
1984 finalDelay: 3 * time.Second,
1985 wantRequestCount: 3,
1986 },
1987 {
1988 name: "three deletes in one batch",
1989 batchPeriod: 1 * time.Second,
1990 podsCount: 10,
1991 deletes: []podDelete{
1992 {
1993
1994 delay: 200 * time.Millisecond,
1995 podName: "pod0",
1996 },
1997 {
1998 delay: 100 * time.Millisecond,
1999 podName: "pod1",
2000 },
2001 {
2002 delay: 100 * time.Millisecond,
2003 podName: "pod2",
2004 },
2005 },
2006 finalDelay: 3 * time.Second,
2007 wantRequestCount: 1,
2008 },
2009 {
2010 name: "three deletes in two batches",
2011 batchPeriod: 1 * time.Second,
2012 podsCount: 10,
2013 deletes: []podDelete{
2014 {
2015
2016 delay: 200 * time.Millisecond,
2017 podName: "pod0",
2018 },
2019 {
2020 delay: 100 * time.Millisecond,
2021 podName: "pod1",
2022 },
2023 {
2024 delay: 1 * time.Second,
2025 podName: "pod2",
2026 },
2027 },
2028 finalDelay: 3 * time.Second,
2029 wantRequestCount: 2,
2030 },
2031 }
2032
2033 for _, tc := range tests {
2034 t.Run(tc.name, func(t *testing.T) {
2035 ns := "other"
2036 testServer, endpointsHandler := makeTestServer(t, ns)
2037 defer testServer.Close()
2038
2039 tCtx := ktesting.Init(t)
2040 endpoints := newController(tCtx, testServer.URL, tc.batchPeriod)
2041 endpoints.podsSynced = alwaysReady
2042 endpoints.servicesSynced = alwaysReady
2043 endpoints.endpointsSynced = alwaysReady
2044 endpoints.workerLoopPeriod = 10 * time.Millisecond
2045
2046 go endpoints.Run(tCtx, 1)
2047
2048 addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only)
2049
2050 endpoints.serviceStore.Add(&v1.Service{
2051 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
2052 Spec: v1.ServiceSpec{
2053 Selector: map[string]string{"foo": "bar"},
2054 Ports: []v1.ServicePort{{Port: 80}},
2055 },
2056 })
2057
2058 for _, update := range tc.deletes {
2059 time.Sleep(update.delay)
2060
2061 old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
2062 if err != nil {
2063 t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
2064 }
2065 if !exists {
2066 t.Fatalf("Pod %q doesn't exist", update.podName)
2067 }
2068 endpoints.podStore.Delete(old)
2069 endpoints.deletePod(old)
2070 }
2071
2072 time.Sleep(tc.finalDelay)
2073 endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
2074 })
2075 }
2076 }
2077
2078 func TestSyncEndpointsServiceNotFound(t *testing.T) {
2079 ns := metav1.NamespaceDefault
2080 testServer, endpointsHandler := makeTestServer(t, ns)
2081 defer testServer.Close()
2082
2083 tCtx := ktesting.Init(t)
2084 endpoints := newController(tCtx, testServer.URL, 0)
2085 endpoints.endpointsStore.Add(&v1.Endpoints{
2086 ObjectMeta: metav1.ObjectMeta{
2087 Name: "foo",
2088 Namespace: ns,
2089 ResourceVersion: "1",
2090 },
2091 })
2092 err := endpoints.syncService(tCtx, ns+"/foo")
2093 if err != nil {
2094 t.Errorf("Unexpected error syncing service %v", err)
2095 }
2096 endpointsHandler.ValidateRequestCount(t, 1)
2097 endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "DELETE", nil)
2098 }
2099
2100 func TestSyncServiceOverCapacity(t *testing.T) {
2101 testCases := []struct {
2102 name string
2103 startingAnnotation *string
2104 numExisting int
2105 numDesired int
2106 numDesiredNotReady int
2107 numExpectedReady int
2108 numExpectedNotReady int
2109 expectedAnnotation bool
2110 }{{
2111 name: "empty",
2112 startingAnnotation: nil,
2113 numExisting: 0,
2114 numDesired: 0,
2115 numExpectedReady: 0,
2116 numExpectedNotReady: 0,
2117 expectedAnnotation: false,
2118 }, {
2119 name: "annotation added past capacity, < than maxCapacity of Ready Addresses",
2120 startingAnnotation: nil,
2121 numExisting: maxCapacity - 1,
2122 numDesired: maxCapacity - 3,
2123 numDesiredNotReady: 4,
2124 numExpectedReady: maxCapacity - 3,
2125 numExpectedNotReady: 3,
2126 expectedAnnotation: true,
2127 }, {
2128 name: "annotation added past capacity, maxCapacity of Ready Addresses ",
2129 startingAnnotation: nil,
2130 numExisting: maxCapacity - 1,
2131 numDesired: maxCapacity,
2132 numDesiredNotReady: 10,
2133 numExpectedReady: maxCapacity,
2134 numExpectedNotReady: 0,
2135 expectedAnnotation: true,
2136 }, {
2137 name: "annotation removed below capacity",
2138 startingAnnotation: pointer.String("truncated"),
2139 numExisting: maxCapacity - 1,
2140 numDesired: maxCapacity - 1,
2141 numDesiredNotReady: 0,
2142 numExpectedReady: maxCapacity - 1,
2143 numExpectedNotReady: 0,
2144 expectedAnnotation: false,
2145 }, {
2146 name: "annotation was set to warning previously, annotation removed at capacity",
2147 startingAnnotation: pointer.String("warning"),
2148 numExisting: maxCapacity,
2149 numDesired: maxCapacity,
2150 numDesiredNotReady: 0,
2151 numExpectedReady: maxCapacity,
2152 numExpectedNotReady: 0,
2153 expectedAnnotation: false,
2154 }, {
2155 name: "annotation was set to warning previously but still over capacity",
2156 startingAnnotation: pointer.String("warning"),
2157 numExisting: maxCapacity + 1,
2158 numDesired: maxCapacity + 1,
2159 numDesiredNotReady: 0,
2160 numExpectedReady: maxCapacity,
2161 numExpectedNotReady: 0,
2162 expectedAnnotation: true,
2163 }, {
2164 name: "annotation removed at capacity",
2165 startingAnnotation: pointer.String("truncated"),
2166 numExisting: maxCapacity,
2167 numDesired: maxCapacity,
2168 numDesiredNotReady: 0,
2169 numExpectedReady: maxCapacity,
2170 numExpectedNotReady: 0,
2171 expectedAnnotation: false,
2172 }, {
2173 name: "no endpoints change, annotation value corrected",
2174 startingAnnotation: pointer.String("invalid"),
2175 numExisting: maxCapacity + 1,
2176 numDesired: maxCapacity + 1,
2177 numDesiredNotReady: 0,
2178 numExpectedReady: maxCapacity,
2179 numExpectedNotReady: 0,
2180 expectedAnnotation: true,
2181 }}
2182
2183 for _, tc := range testCases {
2184 t.Run(tc.name, func(t *testing.T) {
2185 tCtx := ktesting.Init(t)
2186 ns := "test"
2187 client, c := newFakeController(tCtx, 0*time.Second)
2188
2189 addPods(c.podStore, ns, tc.numDesired, 1, tc.numDesiredNotReady, ipv4only)
2190 pods := c.podStore.List()
2191
2192 svc := &v1.Service{
2193 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
2194 Spec: v1.ServiceSpec{
2195 Selector: map[string]string{"foo": "bar"},
2196 Ports: []v1.ServicePort{{Port: 80}},
2197 },
2198 }
2199 c.serviceStore.Add(svc)
2200
2201 subset := v1.EndpointSubset{}
2202 for i := 0; i < tc.numExisting; i++ {
2203 pod := pods[i].(*v1.Pod)
2204 epa, _ := podToEndpointAddressForService(svc, pod)
2205 subset.Addresses = append(subset.Addresses, *epa)
2206 }
2207 endpoints := &v1.Endpoints{
2208 ObjectMeta: metav1.ObjectMeta{
2209 Name: svc.Name,
2210 Namespace: ns,
2211 ResourceVersion: "1",
2212 Annotations: map[string]string{},
2213 },
2214 Subsets: []v1.EndpointSubset{subset},
2215 }
2216 if tc.startingAnnotation != nil {
2217 endpoints.Annotations[v1.EndpointsOverCapacity] = *tc.startingAnnotation
2218 }
2219 c.endpointsStore.Add(endpoints)
2220 _, err := client.CoreV1().Endpoints(ns).Create(tCtx, endpoints, metav1.CreateOptions{})
2221 if err != nil {
2222 t.Fatalf("unexpected error creating endpoints: %v", err)
2223 }
2224
2225 err = c.syncService(tCtx, fmt.Sprintf("%s/%s", ns, svc.Name))
2226 if err != nil {
2227 t.Errorf("Unexpected error syncing service %v", err)
2228 }
2229
2230 actualEndpoints, err := client.CoreV1().Endpoints(ns).Get(tCtx, endpoints.Name, metav1.GetOptions{})
2231 if err != nil {
2232 t.Fatalf("unexpected error getting endpoints: %v", err)
2233 }
2234
2235 actualAnnotation, ok := actualEndpoints.Annotations[v1.EndpointsOverCapacity]
2236 if tc.expectedAnnotation {
2237 if !ok {
2238 t.Errorf("Expected EndpointsOverCapacity annotation to be set")
2239 } else if actualAnnotation != "truncated" {
2240 t.Errorf("Expected EndpointsOverCapacity annotation to be 'truncated', got %s", actualAnnotation)
2241 }
2242 } else {
2243 if ok {
2244 t.Errorf("Expected EndpointsOverCapacity annotation not to be set, got %s", actualAnnotation)
2245 }
2246 }
2247 numActualReady := 0
2248 numActualNotReady := 0
2249 for _, subset := range actualEndpoints.Subsets {
2250 numActualReady += len(subset.Addresses)
2251 numActualNotReady += len(subset.NotReadyAddresses)
2252 }
2253 if numActualReady != tc.numExpectedReady {
2254 t.Errorf("Unexpected number of actual ready Endpoints: got %d endpoints, want %d endpoints", numActualReady, tc.numExpectedReady)
2255 }
2256 if numActualNotReady != tc.numExpectedNotReady {
2257 t.Errorf("Unexpected number of actual not ready Endpoints: got %d endpoints, want %d endpoints", numActualNotReady, tc.numExpectedNotReady)
2258 }
2259 })
2260 }
2261 }
2262
2263 func TestTruncateEndpoints(t *testing.T) {
2264 testCases := []struct {
2265 desc string
2266
2267
2268 subsetsReady []int
2269 subsetsNotReady []int
2270 expectedReady []int
2271 expectedNotReady []int
2272 }{{
2273 desc: "empty",
2274 subsetsReady: []int{},
2275 subsetsNotReady: []int{},
2276 expectedReady: []int{},
2277 expectedNotReady: []int{},
2278 }, {
2279 desc: "total endpoints < max capacity",
2280 subsetsReady: []int{50, 100, 100, 100, 100},
2281 subsetsNotReady: []int{50, 100, 100, 100, 100},
2282 expectedReady: []int{50, 100, 100, 100, 100},
2283 expectedNotReady: []int{50, 100, 100, 100, 100},
2284 }, {
2285 desc: "total endpoints = max capacity",
2286 subsetsReady: []int{100, 100, 100, 100, 100},
2287 subsetsNotReady: []int{100, 100, 100, 100, 100},
2288 expectedReady: []int{100, 100, 100, 100, 100},
2289 expectedNotReady: []int{100, 100, 100, 100, 100},
2290 }, {
2291 desc: "total ready endpoints < max capacity, but total endpoints > max capacity",
2292 subsetsReady: []int{90, 110, 50, 10, 20},
2293 subsetsNotReady: []int{101, 200, 200, 201, 298},
2294 expectedReady: []int{90, 110, 50, 10, 20},
2295 expectedNotReady: []int{73, 144, 144, 145, 214},
2296 }, {
2297 desc: "total ready endpoints > max capacity",
2298 subsetsReady: []int{205, 400, 402, 400, 693},
2299 subsetsNotReady: []int{100, 200, 200, 200, 300},
2300 expectedReady: []int{98, 191, 192, 191, 328},
2301 expectedNotReady: []int{0, 0, 0, 0, 0},
2302 }}
2303
2304 for _, tc := range testCases {
2305 t.Run(tc.desc, func(t *testing.T) {
2306 var subsets []v1.EndpointSubset
2307 for subsetIndex, numReady := range tc.subsetsReady {
2308 subset := v1.EndpointSubset{}
2309 for i := 0; i < numReady; i++ {
2310 subset.Addresses = append(subset.Addresses, v1.EndpointAddress{})
2311 }
2312
2313 numNotReady := tc.subsetsNotReady[subsetIndex]
2314 for i := 0; i < numNotReady; i++ {
2315 subset.NotReadyAddresses = append(subset.NotReadyAddresses, v1.EndpointAddress{})
2316 }
2317 subsets = append(subsets, subset)
2318 }
2319
2320 endpoints := &v1.Endpoints{Subsets: subsets}
2321 truncateEndpoints(endpoints)
2322
2323 for i, subset := range endpoints.Subsets {
2324 if len(subset.Addresses) != tc.expectedReady[i] {
2325 t.Errorf("Unexpected number of actual ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.Addresses), tc.expectedReady[i])
2326 }
2327 if len(subset.NotReadyAddresses) != tc.expectedNotReady[i] {
2328 t.Errorf("Unexpected number of actual not ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.NotReadyAddresses), tc.expectedNotReady[i])
2329 }
2330 }
2331 })
2332 }
2333 }
2334
2335 func TestEndpointPortFromServicePort(t *testing.T) {
2336 http := pointer.String("http")
2337 testCases := map[string]struct {
2338 serviceAppProtocol *string
2339 expectedEndpointsAppProtocol *string
2340 }{
2341 "empty app protocol": {
2342 serviceAppProtocol: nil,
2343 expectedEndpointsAppProtocol: nil,
2344 },
2345 "http app protocol": {
2346 serviceAppProtocol: http,
2347 expectedEndpointsAppProtocol: http,
2348 },
2349 }
2350
2351 for name, tc := range testCases {
2352 t.Run(name, func(t *testing.T) {
2353 epp := endpointPortFromServicePort(&v1.ServicePort{Name: "test", AppProtocol: tc.serviceAppProtocol}, 80)
2354
2355 if epp.AppProtocol != tc.expectedEndpointsAppProtocol {
2356 t.Errorf("Expected Endpoints AppProtocol to be %s, got %s", stringVal(tc.expectedEndpointsAppProtocol), stringVal(epp.AppProtocol))
2357 }
2358 })
2359 }
2360 }
2361
2362
2363
2364
2365 func TestMultipleServiceChanges(t *testing.T) {
2366 ns := metav1.NamespaceDefault
2367 expectedSubsets := []v1.EndpointSubset{{
2368 Addresses: []v1.EndpointAddress{
2369 {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
2370 },
2371 }}
2372 endpoint := &v1.Endpoints{
2373 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"},
2374 Subsets: expectedSubsets,
2375 }
2376
2377 controller := &endpointController{}
2378 blockDelete := make(chan struct{})
2379 blockNextAction := make(chan struct{})
2380 stopChan := make(chan struct{})
2381 testServer := makeBlockingEndpointDeleteTestServer(t, controller, endpoint, blockDelete, blockNextAction, ns)
2382 defer testServer.Close()
2383
2384 tCtx := ktesting.Init(t)
2385 *controller = *newController(tCtx, testServer.URL, 0*time.Second)
2386 addPods(controller.podStore, ns, 1, 1, 0, ipv4only)
2387
2388 go func() { controller.Run(tCtx, 1) }()
2389
2390 svc := &v1.Service{
2391 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
2392 Spec: v1.ServiceSpec{
2393 Selector: map[string]string{"foo": "bar"},
2394 ClusterIP: "None",
2395 Ports: nil,
2396 },
2397 }
2398
2399 controller.serviceStore.Add(svc)
2400 controller.onServiceUpdate(svc)
2401
2402 waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Add should have caused a request to be sent to the test server")
2403
2404 controller.serviceStore.Delete(svc)
2405 controller.onServiceDelete(svc)
2406 waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Delete should have caused a request to be sent to the test server")
2407
2408
2409
2410 controller.serviceStore.Add(svc)
2411 controller.onServiceUpdate(svc)
2412
2413
2414 wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
2415 return controller.queue.Len() == 0, nil
2416 })
2417
2418
2419 close(blockDelete)
2420 waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoint should have been recreated")
2421
2422 close(blockNextAction)
2423 close(stopChan)
2424 }
2425
2426 func TestSyncServiceAddresses(t *testing.T) {
2427 makeService := func(tolerateUnready bool) *v1.Service {
2428 return &v1.Service{
2429 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"},
2430 Spec: v1.ServiceSpec{
2431 Selector: map[string]string{"foo": "bar"},
2432 PublishNotReadyAddresses: tolerateUnready,
2433 Type: v1.ServiceTypeClusterIP,
2434 ClusterIP: "1.1.1.1",
2435 Ports: []v1.ServicePort{{Port: 80}},
2436 },
2437 }
2438 }
2439
2440 makePod := func(phase v1.PodPhase, isReady bool, terminating bool) *v1.Pod {
2441 statusCondition := v1.ConditionFalse
2442 if isReady {
2443 statusCondition = v1.ConditionTrue
2444 }
2445
2446 now := metav1.Now()
2447 deletionTimestamp := &now
2448 if !terminating {
2449 deletionTimestamp = nil
2450 }
2451 return &v1.Pod{
2452 ObjectMeta: metav1.ObjectMeta{
2453 Namespace: "ns",
2454 Name: "fakepod",
2455 DeletionTimestamp: deletionTimestamp,
2456 Labels: map[string]string{"foo": "bar"},
2457 },
2458 Spec: v1.PodSpec{
2459 Containers: []v1.Container{{Ports: []v1.ContainerPort{
2460 {Name: "port1", ContainerPort: int32(8080)},
2461 }}},
2462 },
2463 Status: v1.PodStatus{
2464 Phase: phase,
2465 Conditions: []v1.PodCondition{
2466 {
2467 Type: v1.PodReady,
2468 Status: statusCondition,
2469 },
2470 },
2471 PodIP: "10.1.1.1",
2472 PodIPs: []v1.PodIP{
2473 {IP: "10.1.1.1"},
2474 },
2475 },
2476 }
2477 }
2478
2479 testCases := []struct {
2480 name string
2481 pod *v1.Pod
2482 service *v1.Service
2483 expectedReady int
2484 expectedUnready int
2485 }{
2486 {
2487 name: "pod running phase",
2488 pod: makePod(v1.PodRunning, true, false),
2489 service: makeService(false),
2490 expectedReady: 1,
2491 expectedUnready: 0,
2492 },
2493 {
2494 name: "pod running phase being deleted",
2495 pod: makePod(v1.PodRunning, true, true),
2496 service: makeService(false),
2497 expectedReady: 0,
2498 expectedUnready: 0,
2499 },
2500 {
2501 name: "pod unknown phase container ready",
2502 pod: makePod(v1.PodUnknown, true, false),
2503 service: makeService(false),
2504 expectedReady: 1,
2505 expectedUnready: 0,
2506 },
2507 {
2508 name: "pod unknown phase container ready being deleted",
2509 pod: makePod(v1.PodUnknown, true, true),
2510 service: makeService(false),
2511 expectedReady: 0,
2512 expectedUnready: 0,
2513 },
2514 {
2515 name: "pod pending phase container ready",
2516 pod: makePod(v1.PodPending, true, false),
2517 service: makeService(false),
2518 expectedReady: 1,
2519 expectedUnready: 0,
2520 },
2521 {
2522 name: "pod pending phase container ready being deleted",
2523 pod: makePod(v1.PodPending, true, true),
2524 service: makeService(false),
2525 expectedReady: 0,
2526 expectedUnready: 0,
2527 },
2528 {
2529 name: "pod unknown phase container not ready",
2530 pod: makePod(v1.PodUnknown, false, false),
2531 service: makeService(false),
2532 expectedReady: 0,
2533 expectedUnready: 1,
2534 },
2535 {
2536 name: "pod pending phase container not ready",
2537 pod: makePod(v1.PodPending, false, false),
2538 service: makeService(false),
2539 expectedReady: 0,
2540 expectedUnready: 1,
2541 },
2542 {
2543 name: "pod failed phase",
2544 pod: makePod(v1.PodFailed, false, false),
2545 service: makeService(false),
2546 expectedReady: 0,
2547 expectedUnready: 0,
2548 },
2549 {
2550 name: "pod succeeded phase",
2551 pod: makePod(v1.PodSucceeded, false, false),
2552 service: makeService(false),
2553 expectedReady: 0,
2554 expectedUnready: 0,
2555 },
2556 {
2557 name: "pod running phase and tolerate unready",
2558 pod: makePod(v1.PodRunning, false, false),
2559 service: makeService(true),
2560 expectedReady: 1,
2561 expectedUnready: 0,
2562 },
2563 {
2564 name: "pod running phase and tolerate unready being deleted",
2565 pod: makePod(v1.PodRunning, false, true),
2566 service: makeService(true),
2567 expectedReady: 1,
2568 expectedUnready: 0,
2569 },
2570 {
2571 name: "pod unknown phase and tolerate unready",
2572 pod: makePod(v1.PodUnknown, false, false),
2573 service: makeService(true),
2574 expectedReady: 1,
2575 expectedUnready: 0,
2576 },
2577 {
2578 name: "pod unknown phase and tolerate unready being deleted",
2579 pod: makePod(v1.PodUnknown, false, true),
2580 service: makeService(true),
2581 expectedReady: 1,
2582 expectedUnready: 0,
2583 },
2584 {
2585 name: "pod pending phase and tolerate unready",
2586 pod: makePod(v1.PodPending, false, false),
2587 service: makeService(true),
2588 expectedReady: 1,
2589 expectedUnready: 0,
2590 },
2591 {
2592 name: "pod pending phase and tolerate unready being deleted",
2593 pod: makePod(v1.PodPending, false, true),
2594 service: makeService(true),
2595 expectedReady: 1,
2596 expectedUnready: 0,
2597 },
2598 {
2599 name: "pod failed phase and tolerate unready",
2600 pod: makePod(v1.PodFailed, false, false),
2601 service: makeService(true),
2602 expectedReady: 0,
2603 expectedUnready: 0,
2604 },
2605 {
2606 name: "pod succeeded phase and tolerate unready endpoints",
2607 pod: makePod(v1.PodSucceeded, false, false),
2608 service: makeService(true),
2609 expectedReady: 0,
2610 expectedUnready: 0,
2611 },
2612 }
2613
2614 for _, tc := range testCases {
2615 t.Run(tc.name, func(t *testing.T) {
2616 tCtx := ktesting.Init(t)
2617
2618 ns := tc.service.Namespace
2619 client, c := newFakeController(tCtx, 0*time.Second)
2620
2621 err := c.podStore.Add(tc.pod)
2622 if err != nil {
2623 t.Errorf("Unexpected error adding pod %v", err)
2624 }
2625 err = c.serviceStore.Add(tc.service)
2626 if err != nil {
2627 t.Errorf("Unexpected error adding service %v", err)
2628 }
2629 err = c.syncService(tCtx, fmt.Sprintf("%s/%s", ns, tc.service.Name))
2630 if err != nil {
2631 t.Errorf("Unexpected error syncing service %v", err)
2632 }
2633
2634 endpoints, err := client.CoreV1().Endpoints(ns).Get(tCtx, tc.service.Name, metav1.GetOptions{})
2635 if err != nil {
2636 t.Errorf("Unexpected error %v", err)
2637 }
2638
2639 readyEndpoints := 0
2640 unreadyEndpoints := 0
2641 for _, subset := range endpoints.Subsets {
2642 readyEndpoints += len(subset.Addresses)
2643 unreadyEndpoints += len(subset.NotReadyAddresses)
2644 }
2645
2646 if tc.expectedReady != readyEndpoints {
2647 t.Errorf("Expected %d ready endpoints, got %d", tc.expectedReady, readyEndpoints)
2648 }
2649
2650 if tc.expectedUnready != unreadyEndpoints {
2651 t.Errorf("Expected %d ready endpoints, got %d", tc.expectedUnready, unreadyEndpoints)
2652 }
2653 })
2654 }
2655 }
2656
2657 func TestEndpointsDeletionEvents(t *testing.T) {
2658 ns := metav1.NamespaceDefault
2659 testServer, _ := makeTestServer(t, ns)
2660 defer testServer.Close()
2661
2662 tCtx := ktesting.Init(t)
2663 controller := newController(tCtx, testServer.URL, 0)
2664 store := controller.endpointsStore
2665 ep1 := &v1.Endpoints{
2666 ObjectMeta: metav1.ObjectMeta{
2667 Name: "foo",
2668 Namespace: ns,
2669 ResourceVersion: "rv1",
2670 },
2671 }
2672
2673
2674 store.Delete(ep1)
2675 controller.onEndpointsDelete(ep1)
2676
2677 if controller.queue.Len() != 1 {
2678 t.Errorf("Expected one service to be in the queue, found %d", controller.queue.Len())
2679 }
2680 }
2681
2682 func stringVal(str *string) string {
2683 if str == nil {
2684 return "nil"
2685 }
2686 return *str
2687 }
2688
2689
2690 func waitForChanReceive(t *testing.T, timeout time.Duration, receivingChan chan struct{}, errorMsg string) {
2691 timer := time.NewTimer(timeout)
2692 select {
2693 case <-timer.C:
2694 t.Errorf(errorMsg)
2695 case <-receivingChan:
2696 }
2697 }
2698
2699 func TestEndpointSubsetsEqualIgnoreResourceVersion(t *testing.T) {
2700 copyAndMutateEndpointSubset := func(orig *v1.EndpointSubset, mutator func(*v1.EndpointSubset)) *v1.EndpointSubset {
2701 newSubSet := orig.DeepCopy()
2702 mutator(newSubSet)
2703 return newSubSet
2704 }
2705 es1 := &v1.EndpointSubset{
2706 Addresses: []v1.EndpointAddress{
2707 {
2708 IP: "1.1.1.1",
2709 TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-1", Namespace: "ns", ResourceVersion: "1"},
2710 },
2711 },
2712 NotReadyAddresses: []v1.EndpointAddress{
2713 {
2714 IP: "1.1.1.2",
2715 TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-2", Namespace: "ns2", ResourceVersion: "2"},
2716 },
2717 },
2718 Ports: []v1.EndpointPort{{Port: 8081, Protocol: "TCP"}},
2719 }
2720 es2 := &v1.EndpointSubset{
2721 Addresses: []v1.EndpointAddress{
2722 {
2723 IP: "2.2.2.1",
2724 TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-1", Namespace: "ns", ResourceVersion: "3"},
2725 },
2726 },
2727 NotReadyAddresses: []v1.EndpointAddress{
2728 {
2729 IP: "2.2.2.2",
2730 TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-2", Namespace: "ns2", ResourceVersion: "4"},
2731 },
2732 },
2733 Ports: []v1.EndpointPort{{Port: 8082, Protocol: "TCP"}},
2734 }
2735 tests := []struct {
2736 name string
2737 subsets1 []v1.EndpointSubset
2738 subsets2 []v1.EndpointSubset
2739 expected bool
2740 }{
2741 {
2742 name: "Subsets removed",
2743 subsets1: []v1.EndpointSubset{*es1, *es2},
2744 subsets2: []v1.EndpointSubset{*es1},
2745 expected: false,
2746 },
2747 {
2748 name: "Ready Pod IP changed",
2749 subsets1: []v1.EndpointSubset{*es1, *es2},
2750 subsets2: []v1.EndpointSubset{*copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) {
2751 es.Addresses[0].IP = "1.1.1.10"
2752 }), *es2},
2753 expected: false,
2754 },
2755 {
2756 name: "NotReady Pod IP changed",
2757 subsets1: []v1.EndpointSubset{*es1, *es2},
2758 subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
2759 es.NotReadyAddresses[0].IP = "2.2.2.10"
2760 })},
2761 expected: false,
2762 },
2763 {
2764 name: "Pod ResourceVersion changed",
2765 subsets1: []v1.EndpointSubset{*es1, *es2},
2766 subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
2767 es.Addresses[0].TargetRef.ResourceVersion = "100"
2768 })},
2769 expected: true,
2770 },
2771 {
2772 name: "Pod ResourceVersion removed",
2773 subsets1: []v1.EndpointSubset{*es1, *es2},
2774 subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
2775 es.Addresses[0].TargetRef.ResourceVersion = ""
2776 })},
2777 expected: true,
2778 },
2779 {
2780 name: "Ports changed",
2781 subsets1: []v1.EndpointSubset{*es1, *es2},
2782 subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) {
2783 es.Ports[0].Port = 8082
2784 })},
2785 expected: false,
2786 },
2787 }
2788 for _, tt := range tests {
2789 t.Run(tt.name, func(t *testing.T) {
2790 if got := endpointSubsetsEqualIgnoreResourceVersion(tt.subsets1, tt.subsets2); got != tt.expected {
2791 t.Errorf("semanticIgnoreResourceVersion.DeepEqual() = %v, expected %v", got, tt.expected)
2792 }
2793 })
2794 }
2795 }
2796
View as plain text