1 package externalworkload
2
3 import (
4 "context"
5 "fmt"
6 "reflect"
7 "strings"
8 "testing"
9
10 ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
11 "github.com/linkerd/linkerd2/controller/k8s"
12
13 corev1 "k8s.io/api/core/v1"
14 discoveryv1 "k8s.io/api/discovery/v1"
15 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16 "k8s.io/apimachinery/pkg/labels"
17 "k8s.io/apimachinery/pkg/runtime"
18 "k8s.io/apimachinery/pkg/runtime/schema"
19 k8stesting "k8s.io/client-go/testing"
20 epsliceutil "k8s.io/endpointslice/util"
21
22 "k8s.io/apimachinery/pkg/types"
23 "k8s.io/apimachinery/pkg/util/intstr"
24 "k8s.io/apimachinery/pkg/util/rand"
25 "sigs.k8s.io/yaml"
26 )
27
28 var (
29 httpUnnamedPort = corev1.ServicePort{
30 Port: 8080,
31 TargetPort: intstr.IntOrString{
32 Type: intstr.Int,
33 IntVal: 8080,
34 },
35 }
36
37 httpNamedPort = corev1.ServicePort{
38 TargetPort: intstr.IntOrString{
39 Type: intstr.String,
40 StrVal: "http",
41 },
42 }
43
44 defaultTestEndpointsQuota = 100
45
46 testControllerName = "test-controller"
47 )
48
49
50
51
52
53 func TestReconcilerCreatesNewEndpointSlice(t *testing.T) {
54
55
56 k8sAPI, err := k8s.NewFakeAPI([]string{}...)
57 if err != nil {
58 t.Fatalf("unexpected error when creating Kubernetes clientset: %v", err)
59 }
60
61 svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "")
62 ew := makeExternalWorkload("1", "wlkd-1", map[string]string{"app": ""}, map[int32]string{8080: ""}, []string{"192.0.2.0"})
63 ew.ObjectMeta.UID = types.UID(fmt.Sprintf("%s-%s", ew.Namespace, ew.Name))
64
65 r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
66 err = r.reconcile(svc, []*ewv1beta1.ExternalWorkload{ew}, nil)
67 if err != nil {
68 t.Fatalf("unexpected error when reconciling endpoints: %v", err)
69 }
70
71 expectedEndpoint := makeEndpoint([]string{"192.0.2.0"}, true, ew)
72 es := fetchEndpointSlices(t, k8sAPI, svc)
73 if len(es) != 1 {
74 t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 1, len(es))
75 }
76
77 if len(es[0].Endpoints) != 1 {
78 t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 1, len(es[0].Endpoints))
79 }
80
81 if es[0].AddressType != discoveryv1.AddressTypeIPv4 {
82 t.Fatalf("expected endpointslice to have AF %s, got %s instead", discoveryv1.AddressTypeIPv4, es[0].AddressType)
83 }
84 ep := es[0].Endpoints[0]
85 diffEndpoints(t, ep, expectedEndpoint)
86 }
87
88
89
90
91 func TestReconcilerCreatesNewEndpointSliceHeadless(t *testing.T) {
92
93
94 k8sAPI, err := k8s.NewFakeAPI([]string{}...)
95 if err != nil {
96 t.Fatalf("unexpected error when creating Kubernetes clientset: %v", err)
97 }
98
99 svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "")
100 svc.Spec.ClusterIP = corev1.ClusterIPNone
101 ew := makeExternalWorkload("1", "wlkd-1", map[string]string{"app": ""}, map[int32]string{8080: ""}, []string{"192.0.2.0"})
102 ew.Namespace = "default"
103 ew.ObjectMeta.UID = types.UID(fmt.Sprintf("%s-%s", ew.Namespace, ew.Name))
104
105 r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
106 err = r.reconcile(svc, []*ewv1beta1.ExternalWorkload{ew}, nil)
107 if err != nil {
108 t.Fatalf("unexpected error when reconciling endpoints: %v", err)
109 }
110
111 expectedEndpoint := makeEndpoint([]string{"192.0.2.0"}, true, ew)
112 es := fetchEndpointSlices(t, k8sAPI, svc)
113 if len(es) != 1 {
114 t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 1, len(es))
115 }
116
117 if len(es[0].Endpoints) != 1 {
118 t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 1, len(es[0].Endpoints))
119 }
120
121 if es[0].AddressType != discoveryv1.AddressTypeIPv4 {
122 t.Fatalf("expected endpointslice to have AF %s, got %s instead", discoveryv1.AddressTypeIPv4, es[0].AddressType)
123 }
124 ep := es[0].Endpoints[0]
125 diffEndpoints(t, ep, expectedEndpoint)
126
127 if _, ok := es[0].Labels[corev1.IsHeadlessService]; !ok {
128 t.Errorf("expected \"%s\" label to be present on the service", corev1.IsHeadlessService)
129 }
130
131 if ep.Hostname == nil {
132 t.Fatalf("expected endpoint to have a hostname")
133 }
134
135 if *ep.Hostname != ew.Name {
136 t.Errorf("expected \"%s\" as a hostname, got: %s", ew.Name, *ep.Hostname)
137 }
138
139 }
140
141
142
143 func TestReconcilerUpdatesEndpointSlice(t *testing.T) {
144
145 svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "")
146
147
148 ewCreated := makeExternalWorkload("1", "wlkd-1", map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{"192.0.2.1"})
149 ewCreated.ObjectMeta.UID = types.UID(fmt.Sprintf("%s-%s", ewCreated.Namespace, ewCreated.Name))
150
151
152 port := int32(8080)
153 ports := []discoveryv1.EndpointPort{{
154 Port: &port,
155 }}
156 es := makeEndpointSlice(svc, ports)
157 endpoints := []discoveryv1.Endpoint{}
158 endpoints = append(endpoints, externalWorkloadToEndpoint(discoveryv1.AddressTypeIPv4, ewCreated, svc))
159 es.Endpoints = endpoints
160 es.Generation = 1
161
162
163 ewUpdated := makeExternalWorkload("1", "wlkd-2", map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{"192.0.2.0"})
164 ewUpdated.ObjectMeta.UID = types.UID(fmt.Sprintf("%s-%s", ewUpdated.Namespace, ewUpdated.Name))
165
166
167 k8sAPI, err := k8s.NewFakeAPI(endpointSliceAsYaml(t, es))
168 if err != nil {
169 t.Fatalf("unexpected error when creating Kubernetes clientset: %v", err)
170 }
171
172 r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
173 err = r.reconcile(svc, []*ewv1beta1.ExternalWorkload{ewCreated, ewUpdated}, []*discoveryv1.EndpointSlice{es})
174 if err != nil {
175 t.Fatalf("unexpected error when reconciling endpoints: %v", err)
176 }
177
178 slice, err := k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Get(context.Background(), es.Name, metav1.GetOptions{})
179 if err != nil {
180 t.Fatalf("unexpected error when retrieving endpointslice: %v", err)
181 }
182 if len(slice.Endpoints) != 2 {
183 t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 2, len(slice.Endpoints))
184 }
185
186 if slice.AddressType != discoveryv1.AddressTypeIPv4 {
187 t.Fatalf("expected endpointslice to have AF %s, got %s instead", discoveryv1.AddressTypeIPv4, slice.AddressType)
188 }
189
190 for _, ep := range slice.Endpoints {
191 if ep.TargetRef.Name == ewUpdated.Name {
192 expectedEndpoint := makeEndpoint([]string{"192.0.2.0"}, true, ewUpdated)
193 diffEndpoints(t, ep, expectedEndpoint)
194 } else if ep.TargetRef.Name == ewCreated.Name {
195 expectedEndpoint := makeEndpoint([]string{"192.0.2.1"}, true, ewCreated)
196 diffEndpoints(t, ep, expectedEndpoint)
197 } else {
198 t.Errorf("found unexpected targetRef name %s", ep.TargetRef.Name)
199 }
200 }
201 }
202
203
204
205 func TestReconcilerUpdatesEndpointSliceInPlace(t *testing.T) {
206
207 svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "")
208
209
210 ewCreated := makeExternalWorkload("1", "wlkd-1", map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{"192.0.2.1"})
211 ewCreated.ObjectMeta.UID = types.UID(fmt.Sprintf("%s-%s", ewCreated.Namespace, ewCreated.Name))
212
213
214 port := int32(8080)
215 ports := []discoveryv1.EndpointPort{{
216 Port: &port,
217 }}
218 es := makeEndpointSlice(svc, ports)
219 endpoints := []discoveryv1.Endpoint{}
220 endpoints = append(endpoints, externalWorkloadToEndpoint(discoveryv1.AddressTypeIPv4, ewCreated, svc))
221 es.Endpoints = endpoints
222 es.Generation = 1
223
224
225 k8sAPI, err := k8s.NewFakeAPI(endpointSliceAsYaml(t, es))
226 if err != nil {
227 t.Fatalf("unexpected error when creating Kubernetes clientset: %v", err)
228 }
229
230 if err != nil {
231 t.Fatalf("unexpected error when retrieving endpointslice: %v", err)
232 }
233
234
235 ewCreated.Labels = map[string]string{
236 corev1.LabelTopologyZone: "zone1",
237 }
238
239 r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
240 err = r.reconcile(svc, []*ewv1beta1.ExternalWorkload{ewCreated, ewCreated}, []*discoveryv1.EndpointSlice{es})
241 if err != nil {
242 t.Fatalf("unexpected error when reconciling endpoints: %v", err)
243 }
244
245 slice, err := k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Get(context.Background(), es.Name, metav1.GetOptions{})
246 if err != nil {
247 t.Fatalf("unexpected error when retrieving endpointslice: %v", err)
248 }
249 if len(slice.Endpoints) != 1 {
250 t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 1, len(slice.Endpoints))
251 }
252
253 if slice.AddressType != discoveryv1.AddressTypeIPv4 {
254 t.Fatalf("expected endpointslice to have AF %s, got %s instead", discoveryv1.AddressTypeIPv4, slice.AddressType)
255 }
256
257 if slice.Generation == 1 {
258 t.Fatalf("expected endpointslice to have its generation bumped after update")
259 }
260
261 if *slice.Endpoints[0].Zone != "zone1" {
262 t.Fatalf("expected endpoint to be updated with new zone topology")
263 }
264 }
265
266
267
268
269 func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
270 svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpNamedPort}, "192.0.2.1")
271 ews := []*ewv1beta1.ExternalWorkload{}
272
273
274 for i := 0; i < 300; i++ {
275 ready := !(i%3 == 0)
276 offset := i % 5
277 genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
278 genPort := int32(8080 + offset)
279 ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{genPort: "http"}, []string{genIp})
280 ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
281 ews = append(ews, ew)
282 }
283
284 k8sAPI, err := k8s.NewFakeAPI([]string{}...)
285 if err != nil {
286 t.Fatalf("unexpected error when initializing API client: %v", err)
287 }
288
289
290
291 r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
292 r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{})
293 slices := fetchEndpointSlices(t, k8sAPI, svc)
294 expectedNumSlices := 5
295 if len(slices) != expectedNumSlices {
296 t.Fatalf("expected %d slices to be created, got %d instead", expectedNumSlices, len(slices))
297 }
298
299
300 expectSlicesWithLengths(t, []int{60, 60, 60, 60, 60}, slices)
301 expectedSlices := []discoveryv1.EndpointSlice{}
302 for i := range slices {
303 port := int32(8080 + i)
304 expectedSlices = append(expectedSlices, discoveryv1.EndpointSlice{
305 Ports: []discoveryv1.EndpointPort{
306 {
307 Port: &port,
308 },
309 },
310 AddressType: discoveryv1.AddressTypeIPv4,
311 })
312 }
313
314
315 diffEndpointSlicePorts(t, expectedSlices, slices)
316 }
317
318
319
320
321
322 func TestReconcileManyWorkloads(t *testing.T) {
323 svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
324
325 ews := []*ewv1beta1.ExternalWorkload{}
326 for i := 0; i < 250; i++ {
327 ready := !(i%3 == 0)
328 genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
329 ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
330
331 ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
332 ews = append(ews, ew)
333 }
334
335 k8sAPI, actions := newClientset(t, []string{})
336 r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
337 r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{})
338 expectActions(t, actions(), 3, "create", "endpointslices")
339
340 slices := fetchEndpointSlices(t, k8sAPI, svc)
341 expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
342 }
343
344
345
346
347
348
349
350 func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
351 svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
352
353 ews := []*ewv1beta1.ExternalWorkload{}
354 for i := 0; i < 250; i++ {
355 ready := !(i%3 == 0)
356 genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
357 ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
358
359 ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
360 ews = append(ews, ew)
361 }
362
363
364 port := int32(8080)
365 esPorts := []discoveryv1.EndpointPort{{
366 Port: &port,
367 }}
368
369 es1 := makeEndpointSlice(svc, esPorts)
370
371 for i := 1; i < len(ews)-4; i += 4 {
372 addrs := []string{ews[i].Spec.WorkloadIPs[0].Ip}
373 isReady := IsEwReady(ews[i])
374 es1.Endpoints = append(es1.Endpoints, makeEndpoint(addrs, isReady, ews[i]))
375 }
376
377 es2 := makeEndpointSlice(svc, esPorts)
378
379 for i := 3; i < len(ews)-4; i += 4 {
380 addrs := []string{ews[i].Spec.WorkloadIPs[0].Ip}
381 isReady := IsEwReady(ews[i])
382 es2.Endpoints = append(es2.Endpoints, makeEndpoint(addrs, isReady, ews[i]))
383 }
384
385 existingSlices := []*discoveryv1.EndpointSlice{es1, es2}
386 cmc := newCacheMutationCheck(existingSlices)
387 k8sAPI, actions := newClientset(t, []string{})
388 for _, slice := range existingSlices {
389 _, err := k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Create(context.TODO(), slice, metav1.CreateOptions{})
390 if err != nil {
391 t.Fatalf("unexpected error when creating Kubernetes obj: %v", err)
392 }
393 }
394
395 r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
396 r.reconcile(svc, ews, existingSlices)
397 expectActions(t, actions(), 2, "update", "endpointslices")
398
399 slices := fetchEndpointSlices(t, k8sAPI, svc)
400 expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
401
402
403 cmc.Check(t)
404 }
405
406
407
408 func TestReconcileEndpointSlicesUpdatingSvc(t *testing.T) {
409 svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
410
411 ews := []*ewv1beta1.ExternalWorkload{}
412 for i := 0; i < 250; i++ {
413 ready := !(i%3 == 0)
414 genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
415 ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
416
417 ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
418 ews = append(ews, ew)
419 }
420
421 k8sAPI, actions := newClientset(t, []string{})
422 r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
423 r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{})
424
425 slices := fetchEndpointSlices(t, k8sAPI, svc)
426 expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
427 for _, ew := range ews {
428 ew.Spec.Ports[0].Port = int32(81)
429 }
430 svc.Spec.Ports[0].TargetPort.IntVal = 81
431
432 r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{&slices[0], &slices[1], &slices[2]})
433 expectActions(t, actions(), 3, "update", "endpointslices")
434 slices = fetchEndpointSlices(t, k8sAPI, svc)
435 expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
436 for _, slice := range slices {
437 if *slice.Ports[0].Port != 81 {
438 t.Errorf("expected targetPort value to be 81, got: %d", slice.Ports[0].Port)
439 }
440 }
441 }
442
443
444
445
446
447 func TestReconcileEndpointSlicesLabelsUpdatingSvc(t *testing.T) {
448 svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
449
450 ews := []*ewv1beta1.ExternalWorkload{}
451 for i := 0; i < 250; i++ {
452 ready := !(i%3 == 0)
453 genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
454 ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
455
456 ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
457 ews = append(ews, ew)
458 }
459
460 k8sAPI, actions := newClientset(t, []string{})
461 r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
462 r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{})
463
464 slices := fetchEndpointSlices(t, k8sAPI, svc)
465 expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
466
467
468 svc.Labels = map[string]string{"foo": "bar"}
469 r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{&slices[0], &slices[1], &slices[2]})
470 expectActions(t, actions(), 3, "update", "endpointslices")
471
472 slices = fetchEndpointSlices(t, k8sAPI, svc)
473 expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
474
475 for _, slice := range slices {
476 w, ok := slice.Labels["foo"]
477 if !ok {
478 t.Errorf("expected label \"foo\" from parent service not found")
479 } else if "bar" != w {
480 t.Errorf("expected EndpointSlice to have parent service labels: have %s value, expected bar", w)
481 }
482 }
483 }
484
485
486
487 func TestReconcileEndpointSlicesReservedLabelsSvc(t *testing.T) {
488 svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
489
490 ews := []*ewv1beta1.ExternalWorkload{}
491 for i := 0; i < 250; i++ {
492 ready := !(i%3 == 0)
493 genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
494 ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
495
496 ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
497 ews = append(ews, ew)
498 }
499
500 k8sAPI, actions := newClientset(t, []string{})
501 r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
502 r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{})
503 numActionExpected := 3
504
505 slices := fetchEndpointSlices(t, k8sAPI, svc)
506 expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
507 numActionExpected++
508
509
510 svc.Labels = map[string]string{discoveryv1.LabelServiceName: "bad", discoveryv1.LabelManagedBy: "actor", corev1.IsHeadlessService: "invalid"}
511 r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{&slices[0], &slices[1], &slices[2]})
512 slices = fetchEndpointSlices(t, k8sAPI, svc)
513 numActionExpected++
514 if len(actions()) != numActionExpected {
515 t.Errorf("expected %d actions, got %d instead", numActionExpected, len(actions()))
516 }
517
518 expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
519
520 for _, slice := range slices {
521 if v := slice.Labels[discoveryv1.LabelServiceName]; v == "bad" {
522 t.Errorf("unexpected label value \"%s\" from parent service found on slice", "bad")
523 }
524
525 if v := slice.Labels[discoveryv1.LabelManagedBy]; v == "actor" {
526 t.Errorf("unexpected label value \"%s\" from parent service found on slice", "actor")
527 }
528
529 if v := slice.Labels[corev1.IsHeadlessService]; v == "invalid" {
530 t.Errorf("unexpected label value \"%s\" from parent service found on slice", "invalid")
531 }
532 }
533 }
534
535 func TestEndpointSlicesAreRecycled(t *testing.T) {
536 svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
537
538 ews := []*ewv1beta1.ExternalWorkload{}
539 for i := 0; i < 300; i++ {
540 ready := !(i%3 == 0)
541 genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
542 ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
543
544 ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
545 ews = append(ews, ew)
546 }
547
548
549 port := int32(8080)
550 esPorts := []discoveryv1.EndpointPort{{
551 Port: &port,
552 }}
553
554
555 existingSlices := []*discoveryv1.EndpointSlice{}
556 for i, ew := range ews {
557 sliceNum := i / 30
558 if i%30 == 0 {
559 existingSlices = append(existingSlices, makeEndpointSlice(svc, esPorts))
560 }
561
562 addrs := []string{ews[i].Spec.WorkloadIPs[0].Ip}
563 isReady := IsEwReady(ews[i])
564 existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, makeEndpoint(addrs, isReady, ew))
565 }
566
567 cmc := newCacheMutationCheck(existingSlices)
568 k8sAPI, err := k8s.NewFakeAPI([]string{}...)
569 if err != nil {
570 t.Fatalf("unexpected error when creating Kubernetes clientset: %v", err)
571 }
572
573 for _, slice := range existingSlices {
574 _, err := k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Create(context.TODO(), slice, metav1.CreateOptions{})
575 if err != nil {
576 t.Fatalf("unexpected error when creating Kubernetes obj: %v", err)
577 }
578 }
579
580 for _, ew := range ews {
581 ew.Spec.Ports[0].Port = int32(81)
582 }
583
584
585 svc.Spec.Ports[0].TargetPort.IntVal = 81
586 r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
587 r.reconcile(svc, ews, existingSlices)
588
589 slices := fetchEndpointSlices(t, k8sAPI, svc)
590 expectSlicesWithLengths(t, []int{100, 100, 100}, slices)
591
592 cmc.Check(t)
593 }
594
595 func newClientset(t *testing.T, k8sConfigs []string) (*k8s.API, func() []k8stesting.Action) {
596 k8sAPI, actions, err := k8s.NewFakeAPIWithActions(k8sConfigs...)
597
598 if err != nil {
599 t.Fatalf("unexpected error %v", err)
600 }
601
602 return k8sAPI, actions
603 }
604
605 func makeEndpointSlice(svc *corev1.Service, ports []discoveryv1.EndpointPort) *discoveryv1.EndpointSlice {
606
607 ownerRef := metav1.NewControllerRef(svc, schema.GroupVersionKind{Version: "v1", Kind: "Service"})
608 slice := &discoveryv1.EndpointSlice{
609 ObjectMeta: metav1.ObjectMeta{
610 Name: fmt.Sprintf("linkerd-external-%s-%s", svc.Name, rand.String(8)),
611 Namespace: svc.Namespace,
612 Labels: map[string]string{},
613 OwnerReferences: []metav1.OwnerReference{*ownerRef},
614 },
615 AddressType: discoveryv1.AddressTypeIPv4,
616 Endpoints: []discoveryv1.Endpoint{},
617 Ports: ports,
618 }
619 labels, _ := setEndpointSliceLabels(slice, svc, testControllerName)
620 slice.Labels = labels
621 return slice
622 }
623
624
625
626 func expectSlicesWithLengths(t *testing.T, expectedLengths []int, es []discoveryv1.EndpointSlice) {
627 t.Helper()
628 noMatch := []string{}
629 for _, slice := range es {
630 epLen := len(slice.Endpoints)
631 matched := false
632 for i := 0; i < len(expectedLengths); i++ {
633 if epLen == expectedLengths[i] {
634 matched = true
635 expectedLengths = append(expectedLengths[:i], expectedLengths[i+1:]...)
636 break
637 }
638 }
639
640 if !matched {
641 noMatch = append(noMatch, fmt.Sprintf("%s/%s (%d)", slice.Namespace, slice.Name, len(slice.Endpoints)))
642 }
643 }
644
645 if len(noMatch) > 0 {
646 t.Fatalf("slices %s did not match the required lengths, unmatched lengths: %v", strings.Join(noMatch, ", "), expectedLengths)
647 }
648 }
649
650 func diffEndpointSlicePorts(t *testing.T, expected, actual []discoveryv1.EndpointSlice) {
651 t.Helper()
652 if len(expected) != len(actual) {
653 t.Fatalf("expected %d slices, got %d instead", len(expected), len(actual))
654 }
655
656 unmatched := []discoveryv1.EndpointSlice{}
657 for _, actualSlice := range actual {
658 matched := false
659 for i := 0; i < len(expected); i++ {
660 expectedSlice := expected[i]
661 expectedHash := epsliceutil.NewPortMapKey(expectedSlice.Ports)
662 actualHash := epsliceutil.NewPortMapKey(actualSlice.Ports)
663
664 if (actualSlice.AddressType == expectedSlice.AddressType) &&
665 (actualHash == expectedHash) {
666 matched = true
667 expected = append(expected[:i], expected[i+1:]...)
668 break
669 }
670 }
671
672 if !matched {
673 unmatched = append(unmatched, actualSlice)
674 }
675 }
676
677 if len(expected) != 0 {
678 t.Errorf("expected slices not found in actual list of EndpointSlices")
679 }
680
681 if len(unmatched) > 0 {
682 t.Errorf("found %d slices that do not match expected ports", len(unmatched))
683 }
684 }
685
686
687
688
689
690 func endpointSliceAsYaml(t *testing.T, es *discoveryv1.EndpointSlice) string {
691 if es.Name == "" {
692 es.Name = fmt.Sprintf("%s-%s", es.ObjectMeta.GenerateName, rand.String(5))
693 es.GenerateName = ""
694 }
695 es.TypeMeta = metav1.TypeMeta{
696 APIVersion: "discovery.k8s.io/v1",
697 Kind: "EndpointSlice",
698 }
699
700 b, err := yaml.Marshal(es)
701 if err != nil {
702 t.Fatalf("unexpected error when serializing endpointslices to yaml")
703 }
704
705 return string(b)
706
707 }
708
709 func makeIPv4Service(selector map[string]string, ports []corev1.ServicePort, clusterIP string) *corev1.Service {
710 return &corev1.Service{
711 ObjectMeta: metav1.ObjectMeta{
712 Name: "test-svc",
713 Namespace: "default",
714 UID: "default-test-svc",
715 },
716 Spec: corev1.ServiceSpec{
717 Ports: ports,
718 Selector: selector,
719 ClusterIP: clusterIP,
720 IPFamilies: []corev1.IPFamily{corev1.IPv4Protocol},
721 },
722 Status: corev1.ServiceStatus{},
723 }
724 }
725
726 func makeEndpoint(addrs []string, isReady bool, ew *ewv1beta1.ExternalWorkload) discoveryv1.Endpoint {
727 rdy := &isReady
728 term := !isReady
729 ep := discoveryv1.Endpoint{
730 Addresses: addrs,
731 Conditions: discoveryv1.EndpointConditions{
732 Ready: rdy,
733 Serving: rdy,
734 Terminating: &term,
735 },
736 TargetRef: &corev1.ObjectReference{
737 Kind: ew.Kind,
738 Namespace: ew.Namespace,
739 Name: ew.Name,
740 UID: ew.UID,
741 },
742 }
743 return ep
744 }
745
746 func fetchEndpointSlices(t *testing.T, k8sAPI *k8s.API, svc *corev1.Service) []discoveryv1.EndpointSlice {
747 t.Helper()
748 selector := labels.Set(map[string]string{
749 discoveryv1.LabelServiceName: svc.Name,
750 discoveryv1.LabelManagedBy: testControllerName,
751 }).AsSelectorPreValidated()
752 fetchedSlices, err := k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).List(context.Background(), metav1.ListOptions{
753 LabelSelector: selector.String(),
754 })
755 if err != nil {
756 t.Fatalf("unexpected error when fetching endpointslices: %v", err)
757 }
758
759 return fetchedSlices.Items
760 }
761
762 func diffEndpoints(t *testing.T, actual, expected discoveryv1.Endpoint) {
763 t.Helper()
764 if len(actual.Addresses) != len(expected.Addresses) {
765 t.Errorf("expected %d addresses, got %d instead", len(expected.Addresses), len(actual.Addresses))
766 }
767
768 if actual.Conditions.Ready != nil && expected.Conditions.Ready != nil {
769 if *actual.Conditions.Ready != *expected.Conditions.Ready {
770 t.Errorf("expected \"ready\" condition to be %t, got %t instead", *expected.Conditions.Ready, *actual.Conditions.Ready)
771 }
772 }
773
774 if actual.Conditions.Serving != nil && expected.Conditions.Serving != nil {
775 if *actual.Conditions.Serving != *expected.Conditions.Serving {
776 t.Errorf("expected \"serving\" condition to be %t, got %t instead", *expected.Conditions.Serving, *actual.Conditions.Serving)
777 }
778 }
779
780 if actual.Conditions.Terminating != nil && expected.Conditions.Terminating != nil {
781 if *actual.Conditions.Terminating != *expected.Conditions.Terminating {
782 t.Errorf("expected \"terminating\" condition to be %t, got %t instead", *expected.Conditions.Terminating, *actual.Conditions.Terminating)
783 }
784 }
785
786 if actual.Zone != nil && expected.Zone != nil {
787 if *actual.Zone != *expected.Zone {
788 t.Errorf("expected \"zone=%s\", got \"zone=%s\" instead", *expected.Zone, *actual.Zone)
789 }
790 }
791
792 actualAddrs := toSet(actual.Addresses)
793 expAddrs := toSet(expected.Addresses)
794 for actualAddr := range actualAddrs {
795 if _, found := expAddrs[actualAddr]; !found {
796 t.Errorf("found unexpected address %s in the actual endpoint", actualAddr)
797 }
798 }
799
800 for expAddr := range expAddrs {
801 if _, found := actualAddrs[expAddr]; !found {
802 t.Errorf("expected to find address %s in the actual endpoint", expAddr)
803 }
804 }
805
806 expRef := expected.TargetRef
807 actRef := actual.TargetRef
808 if expRef.UID != actRef.UID {
809 t.Errorf("expected targetRef with UID %s; got %s instead", expRef.UID, actRef.UID)
810 }
811
812 if expRef.Name != actRef.Name {
813 t.Errorf("expected targetRef with name %s; got %s instead", expRef.Name, actRef.Name)
814 }
815
816 }
817
818
819
820
821
822
823
824
825
826 type cacheMutationCheck struct {
827 objects []cacheObject
828 }
829
830
831
832 type cacheObject struct {
833 original runtime.Object
834 deepCopy runtime.Object
835 }
836
837
838 func newCacheMutationCheck(endpointSlices []*discoveryv1.EndpointSlice) cacheMutationCheck {
839 cmc := cacheMutationCheck{}
840 for _, endpointSlice := range endpointSlices {
841 cmc.Add(endpointSlice)
842 }
843 return cmc
844 }
845
846
847
848 func (cmc *cacheMutationCheck) Add(o runtime.Object) {
849 cmc.objects = append(cmc.objects, cacheObject{
850 original: o,
851 deepCopy: o.DeepCopyObject(),
852 })
853 }
854
855
856 func (cmc *cacheMutationCheck) Check(t *testing.T) {
857 for _, o := range cmc.objects {
858 if !reflect.DeepEqual(o.original, o.deepCopy) {
859
860
861 t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
862 }
863 }
864 }
865
866 func toSet(s []string) map[string]struct{} {
867 set := map[string]struct{}{}
868 for _, k := range s {
869 set[k] = struct{}{}
870 }
871 return set
872 }
873
View as plain text