1
16
17 package endpointslice
18
19 import (
20 "context"
21 "fmt"
22 "sort"
23 "testing"
24 "time"
25
26 corev1 "k8s.io/api/core/v1"
27 discovery "k8s.io/api/discovery/v1"
28 apiequality "k8s.io/apimachinery/pkg/api/equality"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/util/wait"
31 "k8s.io/client-go/informers"
32 clientset "k8s.io/client-go/kubernetes"
33 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
34 "k8s.io/kubernetes/pkg/controller/endpoint"
35 "k8s.io/kubernetes/pkg/controller/endpointslice"
36 "k8s.io/kubernetes/pkg/controller/endpointslicemirroring"
37 "k8s.io/kubernetes/test/integration/framework"
38 "k8s.io/kubernetes/test/utils/ktesting"
39 )
40
41 func TestEndpointSliceMirroring(t *testing.T) {
42
43 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
44 defer server.TearDownFn()
45
46 client, err := clientset.NewForConfig(server.ClientConfig)
47 if err != nil {
48 t.Fatalf("Error creating clientset: %v", err)
49 }
50
51 tCtx := ktesting.Init(t)
52 resyncPeriod := 12 * time.Hour
53 informers := informers.NewSharedInformerFactory(client, resyncPeriod)
54
55 epController := endpoint.NewEndpointController(
56 tCtx,
57 informers.Core().V1().Pods(),
58 informers.Core().V1().Services(),
59 informers.Core().V1().Endpoints(),
60 client,
61 1*time.Second)
62
63 epsController := endpointslice.NewController(
64 tCtx,
65 informers.Core().V1().Pods(),
66 informers.Core().V1().Services(),
67 informers.Core().V1().Nodes(),
68 informers.Discovery().V1().EndpointSlices(),
69 int32(100),
70 client,
71 1*time.Second)
72
73 epsmController := endpointslicemirroring.NewController(
74 tCtx,
75 informers.Core().V1().Endpoints(),
76 informers.Discovery().V1().EndpointSlices(),
77 informers.Core().V1().Services(),
78 int32(100),
79 client,
80 1*time.Second)
81
82
83 informers.Start(tCtx.Done())
84 go epController.Run(tCtx, 5)
85 go epsController.Run(tCtx, 5)
86 go epsmController.Run(tCtx, 5)
87
88 testCases := []struct {
89 testName string
90 service *corev1.Service
91 customEndpoints *corev1.Endpoints
92 expectEndpointSlice int
93 expectEndpointSliceManagedBy string
94 }{{
95 testName: "Service with selector",
96 service: &corev1.Service{
97 ObjectMeta: metav1.ObjectMeta{
98 Name: "test-123",
99 },
100 Spec: corev1.ServiceSpec{
101 Ports: []corev1.ServicePort{{
102 Port: int32(80),
103 }},
104 Selector: map[string]string{
105 "foo": "bar",
106 },
107 },
108 },
109 expectEndpointSlice: 1,
110 expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io",
111 }, {
112 testName: "Service without selector",
113 service: &corev1.Service{
114 ObjectMeta: metav1.ObjectMeta{
115 Name: "test-123",
116 },
117 Spec: corev1.ServiceSpec{
118 Ports: []corev1.ServicePort{{
119 Port: int32(80),
120 }},
121 },
122 },
123 customEndpoints: &corev1.Endpoints{
124 ObjectMeta: metav1.ObjectMeta{
125 Name: "test-123",
126 },
127 Subsets: []corev1.EndpointSubset{{
128 Ports: []corev1.EndpointPort{{
129 Port: 80,
130 }},
131 Addresses: []corev1.EndpointAddress{{
132 IP: "10.0.0.1",
133 }},
134 }},
135 },
136 expectEndpointSlice: 1,
137 expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
138 }, {
139 testName: "Service without selector Endpoint multiple subsets and same address",
140 service: &corev1.Service{
141 ObjectMeta: metav1.ObjectMeta{
142 Name: "test-123",
143 },
144 Spec: corev1.ServiceSpec{
145 Ports: []corev1.ServicePort{{
146 Port: int32(80),
147 }},
148 },
149 },
150 customEndpoints: &corev1.Endpoints{
151 ObjectMeta: metav1.ObjectMeta{
152 Name: "test-123",
153 },
154 Subsets: []corev1.EndpointSubset{
155 {
156 Ports: []corev1.EndpointPort{{
157 Name: "port1",
158 Port: 80,
159 }},
160 Addresses: []corev1.EndpointAddress{{
161 IP: "10.0.0.1",
162 }},
163 },
164 {
165 Ports: []corev1.EndpointPort{{
166 Name: "port2",
167 Port: 90,
168 }},
169 Addresses: []corev1.EndpointAddress{{
170 IP: "10.0.0.1",
171 }},
172 },
173 },
174 },
175 expectEndpointSlice: 1,
176 expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
177 }, {
178 testName: "Service without selector Endpoint multiple subsets",
179 service: &corev1.Service{
180 ObjectMeta: metav1.ObjectMeta{
181 Name: "test-123",
182 },
183 Spec: corev1.ServiceSpec{
184 Ports: []corev1.ServicePort{{
185 Port: int32(80),
186 }},
187 },
188 },
189 customEndpoints: &corev1.Endpoints{
190 ObjectMeta: metav1.ObjectMeta{
191 Name: "test-123",
192 },
193 Subsets: []corev1.EndpointSubset{
194 {
195 Ports: []corev1.EndpointPort{{
196 Name: "port1",
197 Port: 80,
198 }},
199 Addresses: []corev1.EndpointAddress{{
200 IP: "10.0.0.1",
201 }},
202 },
203 {
204 Ports: []corev1.EndpointPort{{
205 Name: "port2",
206 Port: 90,
207 }},
208 Addresses: []corev1.EndpointAddress{{
209 IP: "10.0.0.2",
210 }},
211 },
212 },
213 },
214 expectEndpointSlice: 2,
215 expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
216 }, {
217 testName: "Service without Endpoints",
218 service: &corev1.Service{
219 ObjectMeta: metav1.ObjectMeta{
220 Name: "test-123",
221 },
222 Spec: corev1.ServiceSpec{
223 Ports: []corev1.ServicePort{{
224 Port: int32(80),
225 }},
226 Selector: map[string]string{
227 "foo": "bar",
228 },
229 },
230 },
231 customEndpoints: nil,
232 expectEndpointSlice: 1,
233 expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io",
234 }, {
235 testName: "Endpoints without Service",
236 service: nil,
237 customEndpoints: &corev1.Endpoints{
238 ObjectMeta: metav1.ObjectMeta{
239 Name: "test-123",
240 },
241 Subsets: []corev1.EndpointSubset{{
242 Ports: []corev1.EndpointPort{{
243 Port: 80,
244 }},
245 Addresses: []corev1.EndpointAddress{{
246 IP: "10.0.0.1",
247 }},
248 }},
249 },
250 expectEndpointSlice: 0,
251 }}
252
253 for i, tc := range testCases {
254 t.Run(tc.testName, func(t *testing.T) {
255 ns := framework.CreateNamespaceOrDie(client, fmt.Sprintf("test-endpointslice-mirroring-%d", i), t)
256 defer framework.DeleteNamespaceOrDie(client, ns, t)
257
258 resourceName := ""
259 if tc.service != nil {
260 resourceName = tc.service.Name
261 tc.service.Namespace = ns.Name
262 _, err = client.CoreV1().Services(ns.Name).Create(tCtx, tc.service, metav1.CreateOptions{})
263 if err != nil {
264 t.Fatalf("Error creating service: %v", err)
265 }
266 }
267
268 if tc.customEndpoints != nil {
269 resourceName = tc.customEndpoints.Name
270 tc.customEndpoints.Namespace = ns.Name
271 _, err = client.CoreV1().Endpoints(ns.Name).Create(tCtx, tc.customEndpoints, metav1.CreateOptions{})
272 if err != nil {
273 t.Fatalf("Error creating endpoints: %v", err)
274 }
275 }
276
277 err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
278 lSelector := discovery.LabelServiceName + "=" + resourceName
279 esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(tCtx, metav1.ListOptions{LabelSelector: lSelector})
280 if err != nil {
281 t.Logf("Error listing EndpointSlices: %v", err)
282 return false, err
283 }
284
285 if tc.expectEndpointSlice > 0 {
286 if len(esList.Items) < tc.expectEndpointSlice {
287 t.Logf("Waiting for EndpointSlice to be created")
288 return false, nil
289 }
290 if len(esList.Items) != tc.expectEndpointSlice {
291 return false, fmt.Errorf("Only expected %d EndpointSlice, got %d", tc.expectEndpointSlice, len(esList.Items))
292 }
293 endpointSlice := esList.Items[0]
294 if tc.expectEndpointSliceManagedBy != "" {
295 if endpointSlice.Labels[discovery.LabelManagedBy] != tc.expectEndpointSliceManagedBy {
296 return false, fmt.Errorf("Expected EndpointSlice to be managed by %s, got %s", tc.expectEndpointSliceManagedBy, endpointSlice.Labels[discovery.LabelManagedBy])
297 }
298 }
299 } else if len(esList.Items) > 0 {
300 t.Logf("Waiting for EndpointSlices to be removed, still %d", len(esList.Items))
301 return false, nil
302 }
303
304 return true, nil
305 })
306 if err != nil {
307 t.Fatalf("Timed out waiting for conditions: %v", err)
308 }
309 })
310 }
311
312 }
313
314 func TestEndpointSliceMirroringUpdates(t *testing.T) {
315
316 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
317 defer server.TearDownFn()
318
319 client, err := clientset.NewForConfig(server.ClientConfig)
320 if err != nil {
321 t.Fatalf("Error creating clientset: %v", err)
322 }
323
324 resyncPeriod := 12 * time.Hour
325 informers := informers.NewSharedInformerFactory(client, resyncPeriod)
326
327 tCtx := ktesting.Init(t)
328 epsmController := endpointslicemirroring.NewController(
329 tCtx,
330 informers.Core().V1().Endpoints(),
331 informers.Discovery().V1().EndpointSlices(),
332 informers.Core().V1().Services(),
333 int32(100),
334 client,
335 1*time.Second)
336
337
338 informers.Start(tCtx.Done())
339 go epsmController.Run(tCtx, 1)
340
341 testCases := []struct {
342 testName string
343 tweakEndpoint func(ep *corev1.Endpoints)
344 }{
345 {
346 testName: "Update labels",
347 tweakEndpoint: func(ep *corev1.Endpoints) {
348 ep.Labels["foo"] = "bar"
349 },
350 },
351 {
352 testName: "Update annotations",
353 tweakEndpoint: func(ep *corev1.Endpoints) {
354 ep.Annotations["foo2"] = "bar2"
355 },
356 },
357 {
358 testName: "Update annotations but triggertime",
359 tweakEndpoint: func(ep *corev1.Endpoints) {
360 ep.Annotations["foo2"] = "bar2"
361 ep.Annotations[corev1.EndpointsLastChangeTriggerTime] = "date"
362 },
363 },
364 {
365 testName: "Update addresses",
366 tweakEndpoint: func(ep *corev1.Endpoints) {
367 ep.Subsets[0].Addresses = []corev1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "1.2.3.6"}}
368 },
369 },
370 }
371
372 for i, tc := range testCases {
373 t.Run(tc.testName, func(t *testing.T) {
374 ns := framework.CreateNamespaceOrDie(client, fmt.Sprintf("test-endpointslice-mirroring-%d", i), t)
375 defer framework.DeleteNamespaceOrDie(client, ns, t)
376
377 service := &corev1.Service{
378 ObjectMeta: metav1.ObjectMeta{
379 Name: "test-123",
380 Namespace: ns.Name,
381 },
382 Spec: corev1.ServiceSpec{
383 Ports: []corev1.ServicePort{{
384 Port: int32(80),
385 }},
386 },
387 }
388
389 customEndpoints := &corev1.Endpoints{
390 ObjectMeta: metav1.ObjectMeta{
391 Name: "test-123",
392 Namespace: ns.Name,
393 Labels: map[string]string{},
394 Annotations: map[string]string{},
395 },
396 Subsets: []corev1.EndpointSubset{{
397 Ports: []corev1.EndpointPort{{
398 Port: 80,
399 }},
400 Addresses: []corev1.EndpointAddress{{
401 IP: "10.0.0.1",
402 }},
403 }},
404 }
405
406 _, err = client.CoreV1().Services(ns.Name).Create(tCtx, service, metav1.CreateOptions{})
407 if err != nil {
408 t.Fatalf("Error creating service: %v", err)
409 }
410
411 _, err = client.CoreV1().Endpoints(ns.Name).Create(tCtx, customEndpoints, metav1.CreateOptions{})
412 if err != nil {
413 t.Fatalf("Error creating endpoints: %v", err)
414 }
415
416
417 tc.tweakEndpoint(customEndpoints)
418 _, err = client.CoreV1().Endpoints(ns.Name).Update(tCtx, customEndpoints, metav1.UpdateOptions{})
419 if err != nil {
420 t.Fatalf("Error updating endpoints: %v", err)
421 }
422
423
424 err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
425 lSelector := discovery.LabelServiceName + "=" + service.Name
426 esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(tCtx, metav1.ListOptions{LabelSelector: lSelector})
427 if err != nil {
428 t.Logf("Error listing EndpointSlices: %v", err)
429 return false, err
430 }
431
432 if len(esList.Items) == 0 {
433 t.Logf("Waiting for EndpointSlice to be created")
434 return false, nil
435 }
436
437 for _, endpointSlice := range esList.Items {
438 if endpointSlice.Labels[discovery.LabelManagedBy] != "endpointslicemirroring-controller.k8s.io" {
439 return false, fmt.Errorf("Expected EndpointSlice to be managed by endpointslicemirroring-controller.k8s.io, got %s", endpointSlice.Labels[discovery.LabelManagedBy])
440 }
441
442
443 epAddresses := []string{}
444 for _, address := range customEndpoints.Subsets[0].Addresses {
445 epAddresses = append(epAddresses, address.IP)
446 }
447
448 sliceAddresses := []string{}
449 for _, sliceEndpoint := range endpointSlice.Endpoints {
450 sliceAddresses = append(sliceAddresses, sliceEndpoint.Addresses...)
451 }
452
453 sort.Strings(epAddresses)
454 sort.Strings(sliceAddresses)
455
456 if !apiequality.Semantic.DeepEqual(epAddresses, sliceAddresses) {
457 t.Logf("Expected EndpointSlice to have the same IP addresses, expected %v got %v", epAddresses, sliceAddresses)
458 return false, nil
459 }
460
461
462 if !isSubset(customEndpoints.Labels, endpointSlice.Labels) {
463 t.Logf("Expected EndpointSlice to mirror labels, expected %v to be in received %v", customEndpoints.Labels, endpointSlice.Labels)
464 return false, nil
465 }
466
467
468 annotations := map[string]string{}
469 for k, v := range customEndpoints.Annotations {
470 if k == corev1.EndpointsLastChangeTriggerTime {
471 continue
472 }
473 annotations[k] = v
474 }
475 if !apiequality.Semantic.DeepEqual(annotations, endpointSlice.Annotations) {
476 t.Logf("Expected EndpointSlice to mirror annotations, expected %v received %v", customEndpoints.Annotations, endpointSlice.Annotations)
477 return false, nil
478 }
479 }
480 return true, nil
481 })
482 if err != nil {
483 t.Fatalf("Timed out waiting for conditions: %v", err)
484 }
485 })
486 }
487 }
488
489 func TestEndpointSliceMirroringSelectorTransition(t *testing.T) {
490
491 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
492 defer server.TearDownFn()
493
494 client, err := clientset.NewForConfig(server.ClientConfig)
495 if err != nil {
496 t.Fatalf("Error creating clientset: %v", err)
497 }
498
499 resyncPeriod := 12 * time.Hour
500 informers := informers.NewSharedInformerFactory(client, resyncPeriod)
501
502 tCtx := ktesting.Init(t)
503 epsmController := endpointslicemirroring.NewController(
504 tCtx,
505 informers.Core().V1().Endpoints(),
506 informers.Discovery().V1().EndpointSlices(),
507 informers.Core().V1().Services(),
508 int32(100),
509 client,
510 1*time.Second)
511
512
513 informers.Start(tCtx.Done())
514 go epsmController.Run(tCtx, 1)
515
516 testCases := []struct {
517 testName string
518 startingSelector map[string]string
519 startingMirroredSlices int
520 endingSelector map[string]string
521 endingMirroredSlices int
522 }{
523 {
524 testName: "nil -> {foo: bar} selector",
525 startingSelector: nil,
526 startingMirroredSlices: 1,
527 endingSelector: map[string]string{"foo": "bar"},
528 endingMirroredSlices: 0,
529 },
530 {
531 testName: "{foo: bar} -> nil selector",
532 startingSelector: map[string]string{"foo": "bar"},
533 startingMirroredSlices: 0,
534 endingSelector: nil,
535 endingMirroredSlices: 1,
536 },
537 {
538 testName: "{} -> {foo: bar} selector",
539 startingSelector: map[string]string{},
540 startingMirroredSlices: 1,
541 endingSelector: map[string]string{"foo": "bar"},
542 endingMirroredSlices: 0,
543 },
544 {
545 testName: "{foo: bar} -> {} selector",
546 startingSelector: map[string]string{"foo": "bar"},
547 startingMirroredSlices: 0,
548 endingSelector: map[string]string{},
549 endingMirroredSlices: 1,
550 },
551 }
552
553 for i, tc := range testCases {
554 t.Run(tc.testName, func(t *testing.T) {
555 ns := framework.CreateNamespaceOrDie(client, fmt.Sprintf("test-endpointslice-mirroring-%d", i), t)
556 defer framework.DeleteNamespaceOrDie(client, ns, t)
557 meta := metav1.ObjectMeta{Name: "test-123", Namespace: ns.Name}
558
559 service := &corev1.Service{
560 ObjectMeta: meta,
561 Spec: corev1.ServiceSpec{
562 Ports: []corev1.ServicePort{{
563 Port: int32(80),
564 }},
565 Selector: tc.startingSelector,
566 },
567 }
568
569 customEndpoints := &corev1.Endpoints{
570 ObjectMeta: meta,
571 Subsets: []corev1.EndpointSubset{{
572 Ports: []corev1.EndpointPort{{
573 Port: 80,
574 }},
575 Addresses: []corev1.EndpointAddress{{
576 IP: "10.0.0.1",
577 }},
578 }},
579 }
580
581 _, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{})
582 if err != nil {
583 t.Fatalf("Error creating service: %v", err)
584 }
585
586 _, err = client.CoreV1().Endpoints(ns.Name).Create(context.TODO(), customEndpoints, metav1.CreateOptions{})
587 if err != nil {
588 t.Fatalf("Error creating endpoints: %v", err)
589 }
590
591
592 err = waitForMirroredSlices(t, client, ns.Name, service.Name, tc.startingMirroredSlices)
593 if err != nil {
594 t.Fatalf("Timed out waiting for initial mirrored slices to match expectations: %v", err)
595 }
596
597 service.Spec.Selector = tc.endingSelector
598 _, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), service, metav1.UpdateOptions{})
599 if err != nil {
600 t.Fatalf("Error updating service: %v", err)
601 }
602
603
604 err = waitForMirroredSlices(t, client, ns.Name, service.Name, tc.endingMirroredSlices)
605 if err != nil {
606 t.Fatalf("Timed out waiting for final mirrored slices to match expectations: %v", err)
607 }
608 })
609 }
610 }
611
612 func waitForMirroredSlices(t *testing.T, client *clientset.Clientset, nsName, svcName string, num int) error {
613 t.Helper()
614 return wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
615 lSelector := discovery.LabelServiceName + "=" + svcName
616 lSelector += "," + discovery.LabelManagedBy + "=endpointslicemirroring-controller.k8s.io"
617 esList, err := client.DiscoveryV1().EndpointSlices(nsName).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector})
618 if err != nil {
619 t.Logf("Error listing EndpointSlices: %v", err)
620 return false, err
621 }
622
623 if len(esList.Items) != num {
624 t.Logf("Expected %d slices to be mirrored, got %d", num, len(esList.Items))
625 return false, nil
626 }
627
628 return true, nil
629 })
630 }
631
632
633 func isSubset(a, b map[string]string) bool {
634 if len(a) > len(b) {
635 return false
636 }
637 for k, v1 := range a {
638 if v2, ok := b[k]; !ok || v1 != v2 {
639 return false
640 }
641 }
642 return true
643 }
644
View as plain text