1 package externalworkload
2
3 import (
4 "context"
5 "fmt"
6 "reflect"
7 "sort"
8 "testing"
9 "time"
10
11 ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
12 "github.com/linkerd/linkerd2/controller/k8s"
13 v1 "k8s.io/api/core/v1"
14 discoveryv1 "k8s.io/api/discovery/v1"
15 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16 "k8s.io/apimachinery/pkg/runtime/schema"
17 "k8s.io/apimachinery/pkg/types"
18 "k8s.io/apimachinery/pkg/util/intstr"
19 k8stesting "k8s.io/client-go/testing"
20 "k8s.io/client-go/tools/cache"
21 "k8s.io/utils/ptr"
22 )
23
24 type endpointSliceController struct {
25 *EndpointsController
26 endpointSliceStore cache.Store
27 externalWorkloadsStore cache.Store
28 serviceStore cache.Store
29 }
30
31 func newController(t *testing.T) (*k8s.API, func() []k8stesting.Action, *endpointSliceController) {
32 t.Helper()
33
34 k8sAPI, actions, err := k8s.NewFakeAPIWithActions()
35 if err != nil {
36 t.Fatalf("unexpected error %v", err)
37 }
38
39 esController, err := NewEndpointsController(k8sAPI, "hostname", "linkerd", make(chan struct{}), false)
40 if err != nil {
41 t.Fatalf("unexpected error %v", err)
42 }
43
44 return k8sAPI, actions, &endpointSliceController{
45 esController,
46 k8sAPI.ES().Informer().GetStore(),
47 k8sAPI.ExtWorkload().Informer().GetStore(),
48 k8sAPI.Svc().Informer().GetStore(),
49 }
50
51 }
52
53 func newExternalWorkload(n int, namespace string, ready bool, terminating bool) *ewv1beta1.ExternalWorkload {
54 status := ewv1beta1.ConditionTrue
55 if !ready {
56 status = ewv1beta1.ConditionFalse
57 }
58
59 var deletionTimestamp *metav1.Time
60 if terminating {
61 deletionTimestamp = &metav1.Time{
62 Time: time.Now(),
63 }
64 }
65
66 ew := &ewv1beta1.ExternalWorkload{
67 TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
68 ObjectMeta: metav1.ObjectMeta{
69 Namespace: namespace,
70 Name: fmt.Sprintf("ew-%d", n),
71 Labels: map[string]string{"foo": "bar"},
72 DeletionTimestamp: deletionTimestamp,
73 ResourceVersion: fmt.Sprint(n),
74 },
75 Spec: ewv1beta1.ExternalWorkloadSpec{
76 Ports: []ewv1beta1.PortSpec{
77 {
78 Name: "name",
79 Port: 444,
80 },
81 },
82 WorkloadIPs: []ewv1beta1.WorkloadIP{
83 {Ip: "1.2.3.4"},
84 },
85 },
86 Status: ewv1beta1.ExternalWorkloadStatus{
87 Conditions: []ewv1beta1.WorkloadCondition{
88 {
89 Type: ewv1beta1.WorkloadReady,
90 Status: status,
91 },
92 },
93 },
94 }
95
96 return ew
97 }
98
99
100 func TestSyncServiceNoSelector(t *testing.T) {
101 ns := metav1.NamespaceDefault
102 serviceName := "testing-1"
103 _, actions, esController := newController(t)
104 esController.serviceStore.Add(&v1.Service{
105 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns},
106 Spec: v1.ServiceSpec{
107 Ports: []v1.ServicePort{{TargetPort: intstr.FromInt32(80)}},
108 },
109 })
110
111 err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
112 if err != nil {
113 t.Errorf("unexpected error: %s", err)
114 }
115
116 if len(actions()) != 0 {
117 t.Errorf("expected 0 actions, got: %d", len(actions()))
118 }
119 }
120
121 func TestServiceExternalNameTypeSync(t *testing.T) {
122 serviceName := "testing-1"
123 namespace := "zahari"
124
125 testCases := []struct {
126 desc string
127 service *v1.Service
128 }{
129 {
130 desc: "External name with selector and ports should not receive endpoint slices",
131 service: &v1.Service{
132 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
133 Spec: v1.ServiceSpec{
134 Selector: map[string]string{"foo": "bar"},
135 Ports: []v1.ServicePort{{Port: 80}},
136 Type: v1.ServiceTypeExternalName,
137 },
138 },
139 },
140 {
141 desc: "External name with ports should not receive endpoint slices",
142 service: &v1.Service{
143 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
144 Spec: v1.ServiceSpec{
145 Ports: []v1.ServicePort{{Port: 80}},
146 Type: v1.ServiceTypeExternalName,
147 },
148 },
149 },
150 {
151 desc: "External name with selector should not receive endpoint slices",
152 service: &v1.Service{
153 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
154 Spec: v1.ServiceSpec{
155 Selector: map[string]string{"foo": "bar"},
156 Type: v1.ServiceTypeExternalName,
157 },
158 },
159 },
160 {
161 desc: "External name without selector and ports should not receive endpoint slices",
162 service: &v1.Service{
163 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
164 Spec: v1.ServiceSpec{
165 Type: v1.ServiceTypeExternalName,
166 },
167 },
168 },
169 }
170
171 for _, tc := range testCases {
172 t.Run(tc.desc, func(t *testing.T) {
173 client, actions, esController := newController(t)
174 ew := newExternalWorkload(1, namespace, true, false)
175 err := esController.externalWorkloadsStore.Add(ew)
176 if err != nil {
177 t.Errorf("unexpected error: %s", err)
178 }
179
180 err = esController.serviceStore.Add(tc.service)
181 if err != nil {
182 t.Errorf("unexpected error: %s", err)
183 }
184
185 err = esController.syncService(fmt.Sprintf("%s/%s", namespace, serviceName))
186 if err != nil {
187 t.Errorf("unexpected error: %s", err)
188 }
189
190 if len(actions()) != 0 {
191 t.Errorf("expected 0 actions, got: %d", len(actions()))
192 }
193
194 sliceList, err := client.Client.DiscoveryV1().EndpointSlices(namespace).List(context.TODO(), metav1.ListOptions{})
195 if err != nil {
196 t.Errorf("unexpected error: %s", err)
197 }
198 if len(sliceList.Items) != 0 {
199 t.Errorf("Expected 0 endpoint slices, got: %d", len(sliceList.Items))
200 }
201 })
202 }
203 }
204
205
206 func TestSyncServicePendingDeletion(t *testing.T) {
207 ns := metav1.NamespaceDefault
208 serviceName := "testing-1"
209 deletionTimestamp := metav1.Now()
210 _, actions, esController := newController(t)
211 esController.serviceStore.Add(&v1.Service{
212 ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns, DeletionTimestamp: &deletionTimestamp},
213 Spec: v1.ServiceSpec{
214 Selector: map[string]string{"foo": "bar"},
215 Ports: []v1.ServicePort{{TargetPort: intstr.FromInt32(80)}},
216 },
217 })
218
219 err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
220 if err != nil {
221 t.Errorf("unexpected error: %s", err)
222 }
223 if len(actions()) != 0 {
224 t.Errorf("Expected 0 actions, got: %d", len(actions()))
225 }
226 }
227
228
229 func TestSyncServiceExternalWorkloadSelection(t *testing.T) {
230 client, actions, esController := newController(t)
231 ns := "test-ns"
232
233 ew1 := newExternalWorkload(1, ns, true, false)
234 esController.externalWorkloadsStore.Add(ew1)
235
236
237 ew2 := newExternalWorkload(2, ns, true, false)
238 ew2.Labels["foo"] = "boo"
239 esController.externalWorkloadsStore.Add(ew2)
240
241 standardSyncService(t, esController, ns, "testing-1")
242 expectActions(t, actions(), 1, "create", "endpointslices")
243
244
245 slices, err := client.Client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
246 if err != nil {
247 t.Errorf("Expected no error fetching endpoint slices, got: %s", err)
248 }
249 if len(slices.Items) != 1 {
250 t.Errorf("Expected 1 endpoint slices, got: %d", len(slices.Items))
251 }
252
253 slice := slices.Items[0]
254 if len(slice.Endpoints) != 1 {
255 t.Errorf("Expected 1 endpoint in first slice, got: %d", len(slice.Endpoints))
256 }
257 endpoint := slice.Endpoints[0]
258 if endpoint.TargetRef.Kind != "ExternalWorkload" || endpoint.TargetRef.Namespace != ns || endpoint.TargetRef.Name != ew1.Name {
259 t.Errorf("Expected endpoint to target ExternalWorkload")
260 }
261 }
262
263 func TestSyncServiceEndpointSlicePendingDeletion(t *testing.T) {
264 client, actions, esController := newController(t)
265 ns := "test-ns"
266 serviceName := "testing-1"
267 service := createService(t, esController, ns, serviceName)
268 err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
269 if err != nil {
270 t.Fatalf("Expected no error creating EndpointSlice: %v", err)
271 }
272
273 gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
274 ownerRef := metav1.NewControllerRef(service, gvk)
275
276 deletedTs := metav1.Now()
277 endpointSlice := &discoveryv1.EndpointSlice{
278 ObjectMeta: metav1.ObjectMeta{
279 Name: "epSlice-1",
280 Namespace: ns,
281 OwnerReferences: []metav1.OwnerReference{*ownerRef},
282 Labels: map[string]string{
283 discoveryv1.LabelServiceName: serviceName,
284 discoveryv1.LabelManagedBy: managedBy,
285 },
286 DeletionTimestamp: &deletedTs,
287 },
288 AddressType: discoveryv1.AddressTypeIPv4,
289 }
290 err = esController.endpointSliceStore.Add(endpointSlice)
291 if err != nil {
292 t.Fatalf("Expected no error adding EndpointSlice: %v", err)
293 }
294 _, err = client.Client.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
295 if err != nil {
296 t.Fatalf("Expected no error creating EndpointSlice: %v", err)
297 }
298
299 numActionsBefore := len(actions())
300 err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
301 if err != nil {
302 t.Errorf("Expected no error syncing service, got: %s", err)
303 }
304
305
306
307 if len(actions()) != numActionsBefore {
308 t.Errorf("Expected 0 more actions, got %d", len(actions())-numActionsBefore)
309 }
310 }
311
312 func makeExternalWorkload(resVersion, name string, labels map[string]string, ports map[int32]string, ips []string) *ewv1beta1.ExternalWorkload {
313 portSpecs := []ewv1beta1.PortSpec{}
314 for port, name := range ports {
315 spec := ewv1beta1.PortSpec{
316 Port: port,
317 }
318 if name != "" {
319 spec.Name = name
320 }
321 portSpecs = append(portSpecs, spec)
322 }
323
324 wIps := []ewv1beta1.WorkloadIP{}
325 for _, ip := range ips {
326 wIps = append(wIps, ewv1beta1.WorkloadIP{Ip: ip})
327 }
328
329 ew := &ewv1beta1.ExternalWorkload{
330 ObjectMeta: metav1.ObjectMeta{
331 Name: name,
332 Namespace: "ns",
333 Labels: labels,
334 ResourceVersion: resVersion,
335 },
336 Spec: ewv1beta1.ExternalWorkloadSpec{
337 MeshTLS: ewv1beta1.MeshTLS{
338 Identity: "some-identity",
339 ServerName: "some-sni",
340 },
341 Ports: portSpecs,
342 WorkloadIPs: wIps,
343 },
344 Status: ewv1beta1.ExternalWorkloadStatus{
345 Conditions: []ewv1beta1.WorkloadCondition{
346 {
347 Type: ewv1beta1.WorkloadReady,
348 Status: ewv1beta1.ConditionTrue,
349 },
350 },
351 },
352 }
353
354 ew.ObjectMeta.UID = types.UID(fmt.Sprintf("%s-%s", ew.Namespace, ew.Name))
355 return ew
356 }
357
358 func TestSyncService(t *testing.T) {
359 creationTimestamp := metav1.Now()
360 namespace := "test-ns"
361 testcases := []struct {
362 name string
363 service *v1.Service
364 externalWorkloads []*ewv1beta1.ExternalWorkload
365 expectedEndpointPorts []discoveryv1.EndpointPort
366 expectedEndpoints []discoveryv1.Endpoint
367 }{
368 {
369 name: "EW with multiple IPs and Service with ipFamilies=ipv4",
370 service: &v1.Service{
371 ObjectMeta: metav1.ObjectMeta{
372 Name: "foobar",
373 Namespace: namespace,
374 CreationTimestamp: creationTimestamp,
375 },
376 Spec: v1.ServiceSpec{
377 Ports: []v1.ServicePort{
378 {Name: "tcp-example", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP},
379 {Name: "udp-example", TargetPort: intstr.FromInt32(161), Protocol: v1.ProtocolUDP},
380 {Name: "sctp-example", TargetPort: intstr.FromInt32(3456), Protocol: v1.ProtocolSCTP},
381 },
382 Selector: map[string]string{"foo": "bar"},
383 IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
384 },
385 },
386 externalWorkloads: []*ewv1beta1.ExternalWorkload{
387 {
388 ObjectMeta: metav1.ObjectMeta{
389 Namespace: namespace,
390 Name: "ew0",
391 Labels: map[string]string{"foo": "bar"},
392 DeletionTimestamp: nil,
393 },
394 Spec: ewv1beta1.ExternalWorkloadSpec{
395 WorkloadIPs: []ewv1beta1.WorkloadIP{
396 {
397 Ip: "10.0.0.1",
398 },
399 },
400 Ports: []ewv1beta1.PortSpec{
401 {Name: "tcp-example", Port: 80, Protocol: v1.ProtocolTCP},
402 {Name: "udp-example", Port: 161, Protocol: v1.ProtocolUDP},
403 {Name: "sctp-example", Port: 3456, Protocol: v1.ProtocolSCTP},
404 },
405 },
406 Status: ewv1beta1.ExternalWorkloadStatus{
407 Conditions: []ewv1beta1.WorkloadCondition{
408 {
409 Type: ewv1beta1.WorkloadReady,
410 Status: ewv1beta1.ConditionTrue,
411 },
412 },
413 },
414 },
415 {
416 ObjectMeta: metav1.ObjectMeta{
417 Namespace: namespace,
418 Name: "ew1",
419 Labels: map[string]string{"foo": "bar"},
420 DeletionTimestamp: nil,
421 },
422
423 Spec: ewv1beta1.ExternalWorkloadSpec{
424 WorkloadIPs: []ewv1beta1.WorkloadIP{
425 {
426 Ip: "10.0.0.2",
427 },
428 {
429 Ip: "fd08::5678:0000:0000:9abc:def0",
430 },
431 },
432 Ports: []ewv1beta1.PortSpec{
433 {Name: "tcp-example", Port: 80, Protocol: v1.ProtocolTCP},
434 {Name: "udp-example", Port: 161, Protocol: v1.ProtocolUDP},
435 {Name: "sctp-example", Port: 3456, Protocol: v1.ProtocolSCTP},
436 },
437 },
438 Status: ewv1beta1.ExternalWorkloadStatus{
439 Conditions: []ewv1beta1.WorkloadCondition{
440 {
441 Type: ewv1beta1.WorkloadReady,
442 Status: ewv1beta1.ConditionTrue,
443 },
444 },
445 },
446 },
447 },
448 expectedEndpointPorts: []discoveryv1.EndpointPort{
449 {
450 Name: ptr.To("udp-example"),
451 Protocol: protoPtr(v1.ProtocolUDP),
452 Port: ptr.To(int32(161)),
453 },
454 {
455 Name: ptr.To("sctp-example"),
456 Protocol: protoPtr(v1.ProtocolSCTP),
457 Port: ptr.To(int32(3456)),
458 },
459 {
460 Name: ptr.To("tcp-example"),
461 Protocol: protoPtr(v1.ProtocolTCP),
462 Port: ptr.To(int32(80)),
463 },
464 },
465 expectedEndpoints: []discoveryv1.Endpoint{
466 {
467 Conditions: discoveryv1.EndpointConditions{
468 Ready: ptr.To(true),
469 Serving: ptr.To(true),
470 Terminating: ptr.To(false),
471 },
472 Addresses: []string{"10.0.0.1"},
473 TargetRef: &v1.ObjectReference{Kind: "ExternalWorkload", Namespace: namespace, Name: "ew0"},
474 },
475 {
476 Conditions: discoveryv1.EndpointConditions{
477 Ready: ptr.To(true),
478 Serving: ptr.To(true),
479 Terminating: ptr.To(false),
480 },
481 Addresses: []string{"10.0.0.2"},
482 TargetRef: &v1.ObjectReference{Kind: "ExternalWorkload", Namespace: namespace, Name: "ew1"},
483 },
484 },
485 },
486 {
487 name: "EWs with multiple IPs and Service with ipFamilies=ipv6",
488 service: &v1.Service{
489 ObjectMeta: metav1.ObjectMeta{
490 Name: "foobar",
491 Namespace: namespace,
492 CreationTimestamp: creationTimestamp,
493 },
494 Spec: v1.ServiceSpec{
495 Ports: []v1.ServicePort{
496 {Name: "tcp-example", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP},
497 {Name: "udp-example", TargetPort: intstr.FromInt32(161), Protocol: v1.ProtocolUDP},
498 {Name: "sctp-example", TargetPort: intstr.FromInt32(3456), Protocol: v1.ProtocolSCTP},
499 },
500 Selector: map[string]string{"foo": "bar"},
501 IPFamilies: []v1.IPFamily{v1.IPv6Protocol},
502 },
503 },
504 externalWorkloads: []*ewv1beta1.ExternalWorkload{
505 {
506 ObjectMeta: metav1.ObjectMeta{
507 Namespace: namespace,
508 Name: "ew0",
509 Labels: map[string]string{"foo": "bar"},
510 DeletionTimestamp: nil,
511 },
512 Spec: ewv1beta1.ExternalWorkloadSpec{
513 WorkloadIPs: []ewv1beta1.WorkloadIP{
514 {
515 Ip: "10.0.0.1",
516 },
517 },
518 Ports: []ewv1beta1.PortSpec{
519 {Name: "tcp-example", Port: 80, Protocol: v1.ProtocolTCP},
520 {Name: "udp-example", Port: 161, Protocol: v1.ProtocolUDP},
521 {Name: "sctp-example", Port: 3456, Protocol: v1.ProtocolSCTP},
522 },
523 },
524 Status: ewv1beta1.ExternalWorkloadStatus{
525 Conditions: []ewv1beta1.WorkloadCondition{
526 {
527 Type: ewv1beta1.WorkloadReady,
528 Status: ewv1beta1.ConditionTrue,
529 },
530 },
531 },
532 },
533 {
534 ObjectMeta: metav1.ObjectMeta{
535 Namespace: namespace,
536 Name: "ew1",
537 Labels: map[string]string{"foo": "bar"},
538 DeletionTimestamp: nil,
539 },
540 Spec: ewv1beta1.ExternalWorkloadSpec{
541 WorkloadIPs: []ewv1beta1.WorkloadIP{
542 {
543 Ip: "10.0.0.2",
544 },
545 {
546
547 Ip: "fd08::5678:0000:0000:9abc:def0",
548 },
549 },
550 Ports: []ewv1beta1.PortSpec{
551 {Name: "tcp-example", Port: 80, Protocol: v1.ProtocolTCP},
552 {Name: "udp-example", Port: 161, Protocol: v1.ProtocolUDP},
553 {Name: "sctp-example", Port: 3456, Protocol: v1.ProtocolSCTP},
554 },
555 },
556 Status: ewv1beta1.ExternalWorkloadStatus{
557 Conditions: []ewv1beta1.WorkloadCondition{
558 {
559 Type: ewv1beta1.WorkloadReady,
560 Status: ewv1beta1.ConditionTrue,
561 },
562 },
563 },
564 },
565 },
566 expectedEndpointPorts: []discoveryv1.EndpointPort{
567 {
568 Name: ptr.To("udp-example"),
569 Protocol: protoPtr(v1.ProtocolUDP),
570 Port: ptr.To(int32(161)),
571 },
572 {
573 Name: ptr.To("sctp-example"),
574 Protocol: protoPtr(v1.ProtocolSCTP),
575 Port: ptr.To(int32(3456)),
576 },
577 {
578 Name: ptr.To("tcp-example"),
579 Protocol: protoPtr(v1.ProtocolTCP),
580 Port: ptr.To(int32(80)),
581 },
582 },
583 expectedEndpoints: []discoveryv1.Endpoint{
584 {
585 Conditions: discoveryv1.EndpointConditions{
586 Ready: ptr.To(true),
587 Serving: ptr.To(true),
588 Terminating: ptr.To(false),
589 },
590 Addresses: []string{"fd08::5678:0000:0000:9abc:def0"},
591 TargetRef: &v1.ObjectReference{Kind: "ExternalWorkload", Namespace: namespace, Name: "ew1"},
592 },
593 },
594 },
595 {
596 name: "Not ready workloads",
597 service: &v1.Service{
598 ObjectMeta: metav1.ObjectMeta{
599 Name: "foobar",
600 Namespace: namespace,
601 CreationTimestamp: creationTimestamp,
602 },
603 Spec: v1.ServiceSpec{
604 Ports: []v1.ServicePort{
605 {Name: "tcp-example", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP},
606 {Name: "udp-example", TargetPort: intstr.FromInt32(161), Protocol: v1.ProtocolUDP},
607 {Name: "sctp-example", TargetPort: intstr.FromInt32(3456), Protocol: v1.ProtocolSCTP},
608 },
609 Selector: map[string]string{"foo": "bar"},
610 IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
611 },
612 },
613 externalWorkloads: []*ewv1beta1.ExternalWorkload{
614 {
615 ObjectMeta: metav1.ObjectMeta{
616 Namespace: namespace,
617 Name: "ew0",
618 Labels: map[string]string{"foo": "bar"},
619 DeletionTimestamp: nil,
620 },
621 Spec: ewv1beta1.ExternalWorkloadSpec{
622 WorkloadIPs: []ewv1beta1.WorkloadIP{
623 {
624 Ip: "10.0.0.1",
625 },
626 },
627 Ports: []ewv1beta1.PortSpec{
628 {Name: "tcp-example", Port: 80, Protocol: v1.ProtocolTCP},
629 {Name: "udp-example", Port: 161, Protocol: v1.ProtocolUDP},
630 {Name: "sctp-example", Port: 3456, Protocol: v1.ProtocolSCTP},
631 },
632 },
633 Status: ewv1beta1.ExternalWorkloadStatus{
634 Conditions: []ewv1beta1.WorkloadCondition{
635 {
636 Type: ewv1beta1.WorkloadReady,
637 Status: ewv1beta1.ConditionTrue,
638 },
639 },
640 },
641 },
642 {
643 ObjectMeta: metav1.ObjectMeta{
644 Namespace: namespace,
645 Name: "ew1",
646 Labels: map[string]string{"foo": "bar"},
647 DeletionTimestamp: nil,
648 },
649 Spec: ewv1beta1.ExternalWorkloadSpec{
650 WorkloadIPs: []ewv1beta1.WorkloadIP{
651 {
652 Ip: "10.0.0.2",
653 },
654 {
655
656 Ip: "fd08::5678:0000:0000:9abc:def0",
657 },
658 },
659 Ports: []ewv1beta1.PortSpec{
660 {Name: "tcp-example", Port: 80, Protocol: v1.ProtocolTCP},
661 {Name: "udp-example", Port: 161, Protocol: v1.ProtocolUDP},
662 {Name: "sctp-example", Port: 3456, Protocol: v1.ProtocolSCTP},
663 },
664 },
665 Status: ewv1beta1.ExternalWorkloadStatus{
666 Conditions: []ewv1beta1.WorkloadCondition{
667 {
668 Type: ewv1beta1.WorkloadReady,
669 Status: ewv1beta1.ConditionFalse,
670 },
671 },
672 },
673 },
674 },
675 expectedEndpointPorts: []discoveryv1.EndpointPort{
676 {
677 Name: ptr.To("udp-example"),
678 Protocol: protoPtr(v1.ProtocolUDP),
679 Port: ptr.To(int32(161)),
680 },
681 {
682 Name: ptr.To("sctp-example"),
683 Protocol: protoPtr(v1.ProtocolSCTP),
684 Port: ptr.To(int32(3456)),
685 },
686 {
687 Name: ptr.To("tcp-example"),
688 Protocol: protoPtr(v1.ProtocolTCP),
689 Port: ptr.To(int32(80)),
690 },
691 },
692 expectedEndpoints: []discoveryv1.Endpoint{
693 {
694 Conditions: discoveryv1.EndpointConditions{
695 Ready: ptr.To(true),
696 Serving: ptr.To(true),
697 Terminating: ptr.To(false),
698 },
699 Addresses: []string{"10.0.0.1"},
700 TargetRef: &v1.ObjectReference{Kind: "ExternalWorkload", Namespace: namespace, Name: "ew0"},
701 },
702 {
703 Conditions: discoveryv1.EndpointConditions{
704 Ready: ptr.To(false),
705 Serving: ptr.To(false),
706 Terminating: ptr.To(false),
707 },
708 Addresses: []string{"10.0.0.2"},
709 TargetRef: &v1.ObjectReference{Kind: "ExternalWorkload", Namespace: namespace, Name: "ew1"},
710 },
711 },
712 },
713 {
714 name: "Two Ready workloads with the same IPs",
715 service: &v1.Service{
716 ObjectMeta: metav1.ObjectMeta{
717 Name: "foobar",
718 Namespace: namespace,
719 CreationTimestamp: creationTimestamp,
720 },
721 Spec: v1.ServiceSpec{
722 Ports: []v1.ServicePort{
723 {Name: "tcp-example", TargetPort: intstr.FromInt32(80), Protocol: v1.ProtocolTCP},
724 {Name: "udp-example", TargetPort: intstr.FromInt32(161), Protocol: v1.ProtocolUDP},
725 {Name: "sctp-example", TargetPort: intstr.FromInt32(3456), Protocol: v1.ProtocolSCTP},
726 },
727 Selector: map[string]string{"foo": "bar"},
728 IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
729 },
730 },
731 externalWorkloads: []*ewv1beta1.ExternalWorkload{
732 {
733 ObjectMeta: metav1.ObjectMeta{
734 Namespace: namespace,
735 Name: "ew0",
736 Labels: map[string]string{"foo": "bar"},
737 DeletionTimestamp: nil,
738 },
739 Spec: ewv1beta1.ExternalWorkloadSpec{
740 WorkloadIPs: []ewv1beta1.WorkloadIP{
741 {
742 Ip: "10.0.0.1",
743 },
744 },
745 Ports: []ewv1beta1.PortSpec{
746 {Name: "tcp-example", Port: 80, Protocol: v1.ProtocolTCP},
747 {Name: "udp-example", Port: 161, Protocol: v1.ProtocolUDP},
748 {Name: "sctp-example", Port: 3456, Protocol: v1.ProtocolSCTP},
749 },
750 },
751 Status: ewv1beta1.ExternalWorkloadStatus{
752 Conditions: []ewv1beta1.WorkloadCondition{
753 {
754 Type: ewv1beta1.WorkloadReady,
755 Status: ewv1beta1.ConditionTrue,
756 },
757 },
758 },
759 },
760 {
761 ObjectMeta: metav1.ObjectMeta{
762 Namespace: namespace,
763 Name: "ew1",
764 Labels: map[string]string{"foo": "bar"},
765 DeletionTimestamp: nil,
766 },
767 Spec: ewv1beta1.ExternalWorkloadSpec{
768 WorkloadIPs: []ewv1beta1.WorkloadIP{
769 {
770 Ip: "10.0.0.1",
771 },
772 },
773 Ports: []ewv1beta1.PortSpec{
774 {Name: "tcp-example", Port: 80, Protocol: v1.ProtocolTCP},
775 {Name: "udp-example", Port: 161, Protocol: v1.ProtocolUDP},
776 {Name: "sctp-example", Port: 3456, Protocol: v1.ProtocolSCTP},
777 },
778 },
779 Status: ewv1beta1.ExternalWorkloadStatus{
780 Conditions: []ewv1beta1.WorkloadCondition{
781 {
782 Type: ewv1beta1.WorkloadReady,
783 Status: ewv1beta1.ConditionTrue,
784 },
785 },
786 },
787 },
788 },
789 expectedEndpointPorts: []discoveryv1.EndpointPort{
790 {
791 Name: ptr.To("udp-example"),
792 Protocol: protoPtr(v1.ProtocolUDP),
793 Port: ptr.To(int32(161)),
794 },
795 {
796 Name: ptr.To("sctp-example"),
797 Protocol: protoPtr(v1.ProtocolSCTP),
798 Port: ptr.To(int32(3456)),
799 },
800 {
801 Name: ptr.To("tcp-example"),
802 Protocol: protoPtr(v1.ProtocolTCP),
803 Port: ptr.To(int32(80)),
804 },
805 },
806 expectedEndpoints: []discoveryv1.Endpoint{
807 {
808 Conditions: discoveryv1.EndpointConditions{
809 Ready: ptr.To(true),
810 Serving: ptr.To(true),
811 Terminating: ptr.To(false),
812 },
813 Addresses: []string{"10.0.0.1"},
814 TargetRef: &v1.ObjectReference{Kind: "ExternalWorkload", Namespace: namespace, Name: "ew0"},
815 },
816 {
817 Conditions: discoveryv1.EndpointConditions{
818 Ready: ptr.To(true),
819 Serving: ptr.To(true),
820 Terminating: ptr.To(false),
821 },
822 Addresses: []string{"10.0.0.1"},
823 TargetRef: &v1.ObjectReference{Kind: "ExternalWorkload", Namespace: namespace, Name: "ew1"},
824 },
825 },
826 },
827 }
828
829 for _, testcase := range testcases {
830 t.Run(testcase.name, func(t *testing.T) {
831 client, actions, esController := newController(t)
832
833 for _, ew := range testcase.externalWorkloads {
834 esController.externalWorkloadsStore.Add(ew)
835 }
836 esController.serviceStore.Add(testcase.service)
837
838 _, err := esController.k8sAPI.Client.CoreV1().Services(testcase.service.Namespace).Create(context.TODO(), testcase.service, metav1.CreateOptions{})
839 if err != nil {
840 t.Errorf("Expected no error creating service, got: %s", err)
841 }
842 err = esController.syncService(fmt.Sprintf("%s/%s", testcase.service.Namespace, testcase.service.Name))
843 if err != nil {
844 t.Errorf("Expected no error, got: %s", err)
845 }
846
847
848 expectActions(t, actions(), 1, "create", "endpointslices")
849 sliceList, err := client.Client.DiscoveryV1().EndpointSlices(testcase.service.Namespace).List(context.TODO(), metav1.ListOptions{})
850 if err != nil {
851 t.Errorf("Expected no error fetching endpoint slices, got: %s", err)
852 }
853
854 if len(sliceList.Items) != 1 {
855 t.Errorf("Expected 1 endpoints slices")
856 }
857
858
859 slice := sliceList.Items[0]
860
861
862
863 if !reflect.DeepEqual(testcase.expectedEndpointPorts, slice.Ports) {
864 t.Error("actual ports do not match expected ones")
865 }
866
867
868
869
870
871 sort.Slice(slice.Endpoints, func(i, j int) bool {
872 return slice.Endpoints[i].TargetRef.Name < slice.Endpoints[j].TargetRef.Name
873 })
874
875 if !reflect.DeepEqual(testcase.expectedEndpoints, slice.Endpoints) {
876 t.Error("actual endpoints do not match expected ones")
877 }
878 })
879 }
880 }
881
882
883
884
885 func TestEwEndpointsChanged(t *testing.T) {
886 for _, tt := range []struct {
887 name string
888 old *ewv1beta1.ExternalWorkload
889 updated *ewv1beta1.ExternalWorkload
890 specChanged bool
891 }{
892 {
893 name: "no change",
894 old: makeExternalWorkload(
895 "1",
896 "wlkd1",
897 nil,
898 map[int32]string{1: "port-1"},
899 []string{"192.0.2.0"},
900 ),
901 updated: makeExternalWorkload(
902 "1",
903 "wlkd1",
904 nil,
905 map[int32]string{1: "port-1"},
906 []string{"192.0.2.0"},
907 ),
908 specChanged: false,
909 },
910 {
911 name: "updated workload adds an IP address",
912 old: makeExternalWorkload(
913 "1",
914 "wlkd1",
915 nil,
916 map[int32]string{1: "port-1"},
917 []string{"192.0.2.0"},
918 ),
919 updated: makeExternalWorkload(
920 "2",
921 "wlkd1",
922 nil,
923 map[int32]string{1: "port-1"},
924 []string{"192.0.2.0", "192.0.3.0"},
925 ),
926 specChanged: true,
927 },
928 {
929 name: "updated workload removes an IP address",
930 old: makeExternalWorkload(
931 "1",
932 "wlkd1",
933 nil,
934 map[int32]string{1: "port-1"},
935 []string{"192.0.2.0", "192.0.3.0"},
936 ),
937 updated: makeExternalWorkload(
938 "2",
939 "wlkd1",
940 nil,
941 map[int32]string{1: "port-1"},
942 []string{"192.0.2.0"},
943 ),
944 specChanged: true,
945 },
946 {
947 name: "updated workload changes an IP address",
948 old: makeExternalWorkload(
949 "1",
950 "wlkd1",
951 nil,
952 map[int32]string{1: "port-1"},
953 []string{"192.0.2.0"},
954 ),
955 updated: makeExternalWorkload(
956 "2",
957 "wlkd1",
958 nil,
959 map[int32]string{1: "port-1"},
960 []string{"192.0.3.0"},
961 ),
962 specChanged: true,
963 },
964 {
965 name: "updated workload adds new port",
966 old: makeExternalWorkload(
967 "1",
968 "wlkd1",
969 nil,
970 map[int32]string{1: "port-1"},
971 []string{"192.0.2.0"},
972 ),
973 updated: makeExternalWorkload(
974 "2",
975 "wlkd1",
976 nil,
977 map[int32]string{1: "port-1", 2: "port-2"},
978 []string{"192.0.2.0"},
979 ),
980 specChanged: true,
981 },
982 {
983 name: "updated workload removes port",
984 old: makeExternalWorkload(
985 "1",
986 "wlkd1",
987 nil,
988 map[int32]string{1: "port-1", 2: "port-2"},
989 []string{"192.0.2.0"},
990 ),
991 updated: makeExternalWorkload(
992 "2",
993 "wlkd1",
994 nil,
995 map[int32]string{1: "port-1"},
996 []string{"192.0.2.0"},
997 ),
998 specChanged: true,
999 },
1000 {
1001 name: "updated workload changes port number",
1002 old: makeExternalWorkload(
1003 "1",
1004 "wlkd1",
1005 nil,
1006 map[int32]string{1: "port-1"},
1007 []string{"192.0.2.0"},
1008 ),
1009 updated: makeExternalWorkload(
1010 "2",
1011 "wlkd1",
1012 nil,
1013 map[int32]string{2: "port-1"},
1014 []string{"192.0.2.0"},
1015 ),
1016 specChanged: true,
1017 },
1018 {
1019 name: "updated workload changes port name",
1020 old: makeExternalWorkload(
1021 "1",
1022 "wlkd1",
1023 nil,
1024 map[int32]string{1: "port-1"},
1025 []string{"192.0.2.0"},
1026 ),
1027 updated: makeExternalWorkload(
1028 "2",
1029 "wlkd1",
1030 nil,
1031 map[int32]string{1: "port-foo"},
1032 []string{"192.0.2.0"},
1033 ),
1034 specChanged: true,
1035 },
1036 {
1037 name: "updated workload removes port name",
1038 old: makeExternalWorkload(
1039 "1",
1040 "wlkd1",
1041 nil,
1042 map[int32]string{1: "port-1"},
1043 []string{"192.0.2.0"},
1044 ),
1045 updated: makeExternalWorkload(
1046 "2",
1047 "wlkd1",
1048 nil,
1049 map[int32]string{1: ""},
1050 []string{"192.0.2.0"},
1051 ),
1052 specChanged: true,
1053 },
1054 } {
1055 tt := tt
1056 t.Run(tt.name, func(t *testing.T) {
1057 specChanged, _ := ewEndpointsChanged(tt.old, tt.updated)
1058 if tt.specChanged != specChanged {
1059 t.Errorf("expected specChanged '%v', got '%v'", tt.specChanged, specChanged)
1060 }
1061 })
1062 }
1063 }
1064
1065
1066
1067 func TestWorkloadServicesToUpdate(t *testing.T) {
1068 for _, tt := range []struct {
1069 name string
1070 old *ewv1beta1.ExternalWorkload
1071 updated *ewv1beta1.ExternalWorkload
1072 k8sConfigs []string
1073 expectServices map[string]struct{}
1074 }{
1075 {
1076 name: "no change",
1077 old: makeExternalWorkload(
1078 "1",
1079 "wlkd1",
1080 map[string]string{"app": "test"},
1081 map[int32]string{1: "port-1"},
1082 []string{"192.0.2.0"},
1083 ),
1084 updated: makeExternalWorkload(
1085 "1",
1086 "wlkd1",
1087 map[string]string{"app": "test"},
1088 map[int32]string{1: "port-1"},
1089 []string{"192.0.2.0"},
1090 ),
1091 k8sConfigs: []string{`
1092 apiVersion: v1
1093 kind: Service
1094 metadata:
1095 name: svc-1
1096 namespace: ns
1097 spec:
1098 selector:
1099 app: test`,
1100 },
1101 expectServices: map[string]struct{}{},
1102 },
1103 {
1104 name: "labels and spec have changed",
1105 old: makeExternalWorkload(
1106 "1",
1107 "wlkd1",
1108 map[string]string{"app": "test-1"},
1109 map[int32]string{1: "port-1"},
1110 []string{"192.0.2.0"},
1111 ),
1112 updated: makeExternalWorkload(
1113 "2",
1114 "wlkd1",
1115 map[string]string{"app": "test-2"},
1116 map[int32]string{2: "port-1"},
1117 []string{"192.0.2.0"},
1118 ),
1119 k8sConfigs: []string{`
1120 apiVersion: v1
1121 kind: Service
1122 metadata:
1123 name: svc-1
1124 namespace: ns
1125 spec:
1126 selector:
1127 app: test-1`, `
1128 apiVersion: v1
1129 kind: Service
1130 metadata:
1131 name: svc-2
1132 namespace: ns
1133 spec:
1134 selector:
1135 app: test-2`,
1136 },
1137 expectServices: map[string]struct{}{"ns/svc-1": {}, "ns/svc-2": {}},
1138 },
1139 {
1140 name: "spec has changed",
1141 old: makeExternalWorkload(
1142 "1",
1143 "wlkd1",
1144 map[string]string{"app": "test-1"},
1145 map[int32]string{1: "port-1"},
1146 []string{"192.0.2.0"},
1147 ),
1148 updated: makeExternalWorkload(
1149 "2",
1150 "wlkd1",
1151 map[string]string{"app": "test-1"},
1152 map[int32]string{2: "port-1"},
1153 []string{"192.0.2.0"},
1154 ),
1155 k8sConfigs: []string{`
1156 apiVersion: v1
1157 kind: Service
1158 metadata:
1159 name: svc-1
1160 namespace: ns
1161 spec:
1162 selector:
1163 app: test-1`,
1164 },
1165 expectServices: map[string]struct{}{"ns/svc-1": {}},
1166 },
1167 {
1168 name: "labels have changed",
1169 old: makeExternalWorkload(
1170 "1",
1171 "wlkd1",
1172 map[string]string{"app": "test-1", "env": "staging"},
1173 map[int32]string{1: "port-1"},
1174 []string{"192.0.2.0"},
1175 ),
1176 updated: makeExternalWorkload(
1177 "2",
1178 "wlkd1",
1179 map[string]string{"app": "test-1", "env": "prod"},
1180 map[int32]string{1: "port-1"},
1181 []string{"192.0.2.0"},
1182 ),
1183 k8sConfigs: []string{`
1184 apiVersion: v1
1185 kind: Service
1186 metadata:
1187 name: internal
1188 namespace: ns
1189 spec:
1190 selector:
1191 app: test-1`, `
1192 apiVersion: v1
1193 kind: Service
1194 metadata:
1195 name: staging
1196 namespace: ns
1197 spec:
1198 selector:
1199 env: staging`, `
1200 apiVersion: v1
1201 kind: Service
1202 metadata:
1203 name: prod
1204 namespace: ns
1205 spec:
1206 selector:
1207 env: prod`,
1208 },
1209 expectServices: map[string]struct{}{"ns/staging": {}, "ns/prod": {}},
1210 }} {
1211 tt := tt
1212 t.Run(tt.name, func(t *testing.T) {
1213 k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
1214 if err != nil {
1215 t.Fatalf("unexpected error %v", err)
1216 }
1217
1218 ec, err := NewEndpointsController(k8sAPI, "my-hostname", "controlplane-ns", make(chan struct{}), false)
1219 if err != nil {
1220 t.Fatalf("unexpected error %v", err)
1221 }
1222
1223 ec.Start()
1224 k8sAPI.Sync(nil)
1225
1226 services := ec.getServicesToUpdateOnExternalWorkloadChange(tt.old, tt.updated)
1227 if len(services) != len(tt.expectServices) {
1228 t.Fatalf("expected %d services to update, got %d services instead", len(tt.expectServices), len(services))
1229 }
1230
1231 for svc := range services {
1232 if _, ok := tt.expectServices[svc]; !ok {
1233 t.Errorf("unexpected service key %s found in list of results", svc)
1234 }
1235 }
1236 })
1237
1238 }
1239 }
1240
1241
1242
1243
1244
1245
1246
1247
1248 func TestLeaderElectionSyncsState(t *testing.T) {
1249 client, actions, esController := newController(t)
1250 ns := "test-ns"
1251 service := createService(t, esController, ns, "test-svc")
1252 ew1 := newExternalWorkload(1, ns, false, true)
1253 esController.serviceStore.Add(service)
1254 esController.externalWorkloadsStore.Add(ew1)
1255
1256
1257 err := esController.addHandlers()
1258 if err != nil {
1259 t.Fatalf("unexpected error when registering client-go callbacks: %v", err)
1260 }
1261
1262 err = esController.syncService(fmt.Sprintf("%s/%s", ns, service.Name))
1263 if err != nil {
1264 t.Fatalf("unexpected error when processing service %s/%s: %v", ns, service.Name, err)
1265 }
1266 expectActions(t, actions(), 1, "create", "endpointslices")
1267
1268 slices, err := client.Client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
1269 if err != nil {
1270 t.Errorf("expected no error fetching endpoint slices, got: %s", err)
1271 }
1272 if len(slices.Items) != 1 {
1273 t.Errorf("expected 1 endpoint slices, got: %d", len(slices.Items))
1274 }
1275 sliceName := slices.Items[0].Name
1276
1277
1278
1279 err = esController.removeHandlers()
1280 if err != nil {
1281 t.Fatalf("unexpected error when de-registering client-go callbacks: %v", err)
1282 }
1283 err = client.Client.DiscoveryV1().EndpointSlices(ns).Delete(context.TODO(), sliceName, metav1.DeleteOptions{})
1284 if err != nil {
1285 t.Fatalf("unexpected error when deleting endpointslice %s/%s: %v", ns, sliceName, err)
1286 }
1287 slices, err = client.Client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
1288 if err != nil {
1289 t.Errorf("expected no error fetching endpoint slices, got: %s", err)
1290 }
1291 if len(slices.Items) != 0 {
1292 t.Errorf("expected 0 endpoint slices, got: %d", len(slices.Items))
1293 }
1294
1295
1296
1297 esController.addHandlers()
1298 err = esController.syncService(fmt.Sprintf("%s/%s", ns, service.Name))
1299 if err != nil {
1300 t.Fatalf("unexpected error when processing service %s/%s: %v", ns, service.Name, err)
1301 }
1302 expectActions(t, actions(), 1, "create", "endpointslices")
1303 slices, err = client.Client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
1304 if err != nil {
1305 t.Errorf("expected no error fetching endpoint slices, got: %s", err)
1306 }
1307 if len(slices.Items) != 1 {
1308 t.Errorf("expected 1 endpoint slices, got: %d", len(slices.Items))
1309 }
1310 if slices.Items[0].Name == sliceName {
1311 t.Fatalf("expected newly created slice's name to be different than the initial slice, got: %s", sliceName)
1312 }
1313
1314 }
1315
1316
1317 func protoPtr(proto v1.Protocol) *v1.Protocol {
1318 return &proto
1319 }
1320
1321 func newStatusCondition(ready bool) ewv1beta1.WorkloadCondition {
1322 var status ewv1beta1.WorkloadConditionStatus
1323 if ready {
1324 status = ewv1beta1.ConditionTrue
1325 } else {
1326 status = ewv1beta1.ConditionFalse
1327 }
1328 return ewv1beta1.WorkloadCondition{
1329 Type: ewv1beta1.WorkloadReady,
1330 Status: status,
1331 LastProbeTime: metav1.Time{},
1332 LastTransitionTime: metav1.NewTime(time.Now()),
1333 Reason: "test",
1334 Message: "test",
1335 }
1336 }
1337
1338
1339 func expectActions(t *testing.T, actions []k8stesting.Action, num int, verb, resource string) {
1340 t.Helper()
1341
1342 if num > len(actions) {
1343 t.Fatalf("len of actions %v is unexpected. Expected to be at least %v", len(actions), num+1)
1344 }
1345
1346 for i := 0; i < num; i++ {
1347 relativePos := len(actions) - i - 1
1348 if actions[relativePos].GetVerb() != verb {
1349 t.Errorf("Expected action -%d verb to be %s, was: %s", i, verb, actions[relativePos].GetVerb())
1350 }
1351 if resource != actions[relativePos].GetResource().Resource {
1352 t.Errorf("Expected action -%d resource to be %s, was: %s", i, resource, actions[relativePos].GetResource().Resource)
1353 }
1354 }
1355 }
1356
1357 func createService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) *v1.Service {
1358 t.Helper()
1359 service := &v1.Service{
1360 ObjectMeta: metav1.ObjectMeta{
1361 Name: serviceName,
1362 Namespace: namespace,
1363 CreationTimestamp: metav1.NewTime(time.Now()),
1364 UID: types.UID(namespace + "-" + serviceName),
1365 },
1366 Spec: v1.ServiceSpec{
1367 Ports: []v1.ServicePort{{TargetPort: intstr.FromInt32(80)}},
1368 Selector: map[string]string{"foo": "bar"},
1369 IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
1370 },
1371 }
1372 esController.serviceStore.Add(service)
1373 _, err := esController.k8sAPI.Client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
1374 if err != nil {
1375 t.Error("Expected no error creating service")
1376 }
1377 return service
1378 }
1379
1380 func standardSyncService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) {
1381 t.Helper()
1382 createService(t, esController, namespace, serviceName)
1383
1384 err := esController.syncService(fmt.Sprintf("%s/%s", namespace, serviceName))
1385 if err != nil {
1386 t.Error("Expected no error syncing service")
1387 }
1388 }
1389
View as plain text