1 package destination
2
3 import (
4 "fmt"
5 "net/netip"
6 "sort"
7 "strings"
8 "sync"
9 "testing"
10
11 "github.com/go-test/deep"
12 pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
13 "github.com/linkerd/linkerd2-proxy-api/go/net"
14 "github.com/linkerd/linkerd2/controller/api/destination/watcher"
15 ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
16 "github.com/linkerd/linkerd2/pkg/addr"
17 "github.com/linkerd/linkerd2/pkg/k8s"
18 "google.golang.org/protobuf/proto"
19 corev1 "k8s.io/api/core/v1"
20 v1 "k8s.io/api/discovery/v1"
21 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22 )
23
24 var (
25 pod1 = watcher.Address{
26 IP: "1.1.1.1",
27 Port: 1,
28 Pod: &corev1.Pod{
29 ObjectMeta: metav1.ObjectMeta{
30 Name: "pod1",
31 Namespace: "ns",
32 Labels: map[string]string{
33 k8s.ControllerNSLabel: "linkerd",
34 k8s.ProxyDeploymentLabel: "deployment-name",
35 },
36 },
37 Spec: corev1.PodSpec{
38 ServiceAccountName: "serviceaccount-name",
39 },
40 },
41 OwnerKind: "replicationcontroller",
42 OwnerName: "rc-name",
43 }
44
45 pod1IPv6 = watcher.Address{
46 IP: "2001:0db8:85a3:0000:0000:8a2e:0370:7333",
47 Port: 1,
48 Pod: &corev1.Pod{
49 ObjectMeta: metav1.ObjectMeta{
50 Name: "pod1",
51 Namespace: "ns",
52 Labels: map[string]string{
53 k8s.ControllerNSLabel: "linkerd",
54 k8s.ProxyDeploymentLabel: "deployment-name",
55 },
56 },
57 Spec: corev1.PodSpec{
58 ServiceAccountName: "serviceaccount-name",
59 },
60 },
61 OwnerKind: "replicationcontroller",
62 OwnerName: "rc-name",
63 }
64
65 pod2 = watcher.Address{
66 IP: "1.1.1.2",
67 Port: 2,
68 Pod: &corev1.Pod{
69 ObjectMeta: metav1.ObjectMeta{
70 Name: "pod2",
71 Namespace: "ns",
72 Labels: map[string]string{
73 k8s.ControllerNSLabel: "linkerd",
74 k8s.ProxyDeploymentLabel: "deployment-name",
75 },
76 },
77 },
78 }
79
80 pod3 = watcher.Address{
81 IP: "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
82 Port: 3,
83 Pod: &corev1.Pod{
84 ObjectMeta: metav1.ObjectMeta{
85 Name: "pod3",
86 Namespace: "ns",
87 Labels: map[string]string{
88 k8s.ControllerNSLabel: "linkerd",
89 k8s.ProxyDeploymentLabel: "deployment-name",
90 },
91 },
92 },
93 }
94
95 podOpaque = watcher.Address{
96 IP: "1.1.1.4",
97 Port: 4,
98 Pod: &corev1.Pod{
99 ObjectMeta: metav1.ObjectMeta{
100 Name: "pod4",
101 Namespace: "ns",
102 Labels: map[string]string{
103 k8s.ControllerNSLabel: "linkerd",
104 k8s.ProxyDeploymentLabel: "deployment-name",
105 },
106 Annotations: map[string]string{
107 k8s.ProxyOpaquePortsAnnotation: "4",
108 },
109 },
110 Spec: corev1.PodSpec{
111 Containers: []corev1.Container{
112 {
113 Name: k8s.ProxyContainerName,
114 Env: []corev1.EnvVar{
115 {
116 Name: envInboundListenAddr,
117 Value: "0.0.0.0:4143",
118 },
119 },
120 },
121 },
122 },
123 },
124 OpaqueProtocol: true,
125 }
126
127 ew1 = watcher.Address{
128 IP: "1.1.1.1",
129 Port: 1,
130 ExternalWorkload: &ewv1beta1.ExternalWorkload{
131 ObjectMeta: metav1.ObjectMeta{
132 Name: "ew-1",
133 Namespace: "ns",
134 },
135 Spec: ewv1beta1.ExternalWorkloadSpec{
136 MeshTLS: ewv1beta1.MeshTLS{
137 Identity: "spiffe://some-domain/ew-1",
138 ServerName: "server.local",
139 },
140 },
141 },
142 OwnerKind: "workloadgroup",
143 OwnerName: "wg-name",
144 }
145
146 ew2 = watcher.Address{
147 IP: "1.1.1.2",
148 Port: 2,
149 ExternalWorkload: &ewv1beta1.ExternalWorkload{
150 ObjectMeta: metav1.ObjectMeta{
151 Name: "ew-2",
152 Namespace: "ns",
153 Labels: map[string]string{
154 k8s.ControllerNSLabel: "linkerd",
155 k8s.ProxyDeploymentLabel: "deployment-name",
156 },
157 },
158 Spec: ewv1beta1.ExternalWorkloadSpec{
159 MeshTLS: ewv1beta1.MeshTLS{
160 Identity: "spiffe://some-domain/ew-2",
161 ServerName: "server.local",
162 },
163 },
164 },
165 }
166
167 ew3 = watcher.Address{
168 IP: "1.1.1.3",
169 Port: 3,
170 ExternalWorkload: &ewv1beta1.ExternalWorkload{
171 ObjectMeta: metav1.ObjectMeta{
172 Name: "ew-3",
173 Namespace: "ns",
174 Labels: map[string]string{
175 k8s.ControllerNSLabel: "linkerd",
176 k8s.ProxyDeploymentLabel: "deployment-name",
177 },
178 },
179 Spec: ewv1beta1.ExternalWorkloadSpec{
180 MeshTLS: ewv1beta1.MeshTLS{
181 Identity: "spiffe://some-domain/ew-3",
182 ServerName: "server.local",
183 },
184 },
185 },
186 }
187
188 ewOpaque = watcher.Address{
189 IP: "1.1.1.4",
190 Port: 4,
191 ExternalWorkload: &ewv1beta1.ExternalWorkload{
192 ObjectMeta: metav1.ObjectMeta{
193 Name: "pod4",
194 Namespace: "ns",
195 Annotations: map[string]string{
196 k8s.ProxyOpaquePortsAnnotation: "4",
197 },
198 },
199 Spec: ewv1beta1.ExternalWorkloadSpec{
200 MeshTLS: ewv1beta1.MeshTLS{
201 Identity: "spiffe://some-domain/ew-opaque",
202 ServerName: "server.local",
203 },
204
205 Ports: []ewv1beta1.PortSpec{
206 {
207 Port: 4143,
208 Name: "linkerd-proxy",
209 },
210 },
211 },
212 },
213 OpaqueProtocol: true,
214 }
215
216 remoteGateway1 = watcher.Address{
217 IP: "1.1.1.1",
218 Port: 1,
219 }
220
221 remoteGateway2 = watcher.Address{
222 IP: "1.1.1.2",
223 Port: 2,
224 Identity: "some-identity",
225 }
226
227 remoteGatewayAuthOverride = watcher.Address{
228 IP: "1.1.1.2",
229 Port: 2,
230 Identity: "some-identity",
231 AuthorityOverride: "some-auth.com:2",
232 }
233
234 west1aAddress = watcher.Address{
235 IP: "1.1.1.1",
236 Port: 1,
237 ForZones: []v1.ForZone{
238 {Name: "west-1a"},
239 },
240 }
241 west1bAddress = watcher.Address{
242 IP: "1.1.1.1",
243 Port: 2,
244 ForZones: []v1.ForZone{
245 {Name: "west-1b"},
246 },
247 }
248 AddressOnTest123Node = watcher.Address{
249 IP: "1.1.1.1",
250 Port: 1,
251 Pod: &corev1.Pod{
252 ObjectMeta: metav1.ObjectMeta{
253 Name: "pod1",
254 Namespace: "ns",
255 Labels: map[string]string{
256 k8s.ControllerNSLabel: "linkerd",
257 k8s.ProxyDeploymentLabel: "deployment-name",
258 },
259 },
260 Spec: corev1.PodSpec{
261 NodeName: "test-123",
262 },
263 },
264 }
265 AddressNotOnTest123Node = watcher.Address{
266 IP: "1.1.1.2",
267 Port: 2,
268 Pod: &corev1.Pod{
269 ObjectMeta: metav1.ObjectMeta{
270 Name: "pod1",
271 Namespace: "ns",
272 Labels: map[string]string{
273 k8s.ControllerNSLabel: "linkerd",
274 k8s.ProxyDeploymentLabel: "deployment-name",
275 },
276 },
277 Spec: corev1.PodSpec{
278 NodeName: "test-234",
279 },
280 },
281 }
282 )
283
284 func TestEndpointTranslatorForRemoteGateways(t *testing.T) {
285 t.Run("Sends one update for add and another for remove", func(t *testing.T) {
286 mockGetServer, translator := makeEndpointTranslator(t)
287 translator.Start()
288 defer translator.Stop()
289
290 translator.Add(mkAddressSetForServices(remoteGateway1, remoteGateway2))
291 translator.Remove(mkAddressSetForServices(remoteGateway2))
292
293 expectedNumUpdates := 2
294 <-mockGetServer.updatesReceived
295 <-mockGetServer.updatesReceived
296
297 if len(mockGetServer.updatesReceived) != 0 {
298 t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
299 }
300 })
301
302 t.Run("Recovers after emptying address et", func(t *testing.T) {
303 mockGetServer, translator := makeEndpointTranslator(t)
304 translator.Start()
305 defer translator.Stop()
306
307 translator.Add(mkAddressSetForServices(remoteGateway1))
308 translator.Remove(mkAddressSetForServices(remoteGateway1))
309 translator.Add(mkAddressSetForServices(remoteGateway1))
310
311 expectedNumUpdates := 3
312 <-mockGetServer.updatesReceived
313 <-mockGetServer.updatesReceived
314 <-mockGetServer.updatesReceived
315
316 if len(mockGetServer.updatesReceived) != 0 {
317 t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
318 }
319 })
320
321 t.Run("Sends TlsIdentity when enabled", func(t *testing.T) {
322 expectedTLSIdentity := &pb.TlsIdentity_DnsLikeIdentity{
323 Name: "some-identity",
324 }
325
326 expectedProtocolHint := &pb.ProtocolHint{
327 Protocol: &pb.ProtocolHint_H2_{
328 H2: &pb.ProtocolHint_H2{},
329 },
330 }
331
332 mockGetServer, translator := makeEndpointTranslator(t)
333 translator.Start()
334 defer translator.Stop()
335
336 translator.Add(mkAddressSetForServices(remoteGateway2))
337
338 addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
339 if len(addrs) != 1 {
340 t.Fatalf("Expected [1] address returned, got %v", addrs)
341 }
342
343 actualTLSIdentity := addrs[0].GetTlsIdentity().GetDnsLikeIdentity()
344 if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
345 t.Fatalf("TlsIdentity: %v", diff)
346 }
347
348 actualProtocolHint := addrs[0].GetProtocolHint()
349 if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
350 t.Fatalf("ProtocolHint: %v", diff)
351 }
352 })
353
354 t.Run("Sends TlsIdentity and Auth override when present", func(t *testing.T) {
355 expectedTLSIdentity := &pb.TlsIdentity_DnsLikeIdentity{
356 Name: "some-identity",
357 }
358
359 expectedProtocolHint := &pb.ProtocolHint{
360 Protocol: &pb.ProtocolHint_H2_{
361 H2: &pb.ProtocolHint_H2{},
362 },
363 }
364
365 expectedAuthOverride := &pb.AuthorityOverride{
366 AuthorityOverride: "some-auth.com:2",
367 }
368
369 mockGetServer, translator := makeEndpointTranslator(t)
370 translator.Start()
371 defer translator.Stop()
372
373 translator.Add(mkAddressSetForServices(remoteGatewayAuthOverride))
374
375 addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
376 if len(addrs) != 1 {
377 t.Fatalf("Expected [1] address returned, got %v", addrs)
378 }
379
380 actualTLSIdentity := addrs[0].GetTlsIdentity().GetDnsLikeIdentity()
381 if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
382 t.Fatalf("TlsIdentity %v", diff)
383 }
384
385 actualProtocolHint := addrs[0].GetProtocolHint()
386 if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
387 t.Fatalf("ProtocolHint %v", diff)
388 }
389
390 actualAuthOverride := addrs[0].GetAuthorityOverride()
391 if diff := deep.Equal(actualAuthOverride, expectedAuthOverride); diff != nil {
392 t.Fatalf("AuthOverride %v", diff)
393 }
394 })
395
396 t.Run("Does not send TlsIdentity when not present", func(t *testing.T) {
397 mockGetServer, translator := makeEndpointTranslator(t)
398 translator.Start()
399 defer translator.Stop()
400
401 translator.Add(mkAddressSetForServices(remoteGateway1))
402
403 addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
404 if len(addrs) != 1 {
405 t.Fatalf("Expected [1] address returned, got %v", addrs)
406 }
407
408 if addrs[0].TlsIdentity != nil {
409 t.Fatalf("Expected no TlsIdentity to be sent, but got [%v]", addrs[0].TlsIdentity)
410 }
411 if addrs[0].ProtocolHint != nil {
412 t.Fatalf("Expected no ProtocolHint to be sent, but got [%v]", addrs[0].TlsIdentity)
413 }
414 })
415
416 }
417
418 func TestEndpointTranslatorForPods(t *testing.T) {
419 t.Run("Sends one update for add and another for remove", func(t *testing.T) {
420 mockGetServer, translator := makeEndpointTranslator(t)
421 translator.Start()
422 defer translator.Stop()
423
424 translator.Add(mkAddressSetForPods(t, pod1, pod2))
425 translator.Remove(mkAddressSetForPods(t, pod2))
426
427 expectedNumUpdates := 2
428 <-mockGetServer.updatesReceived
429 <-mockGetServer.updatesReceived
430
431 if len(mockGetServer.updatesReceived) != 0 {
432 t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
433 }
434 })
435
436 t.Run("Sends addresses as removed or added", func(t *testing.T) {
437 mockGetServer, translator := makeEndpointTranslator(t)
438 translator.Start()
439 defer translator.Stop()
440
441 translator.Add(mkAddressSetForPods(t, pod1, pod2, pod3))
442 translator.Remove(mkAddressSetForPods(t, pod3))
443
444 addressesAdded := (<-mockGetServer.updatesReceived).GetAdd().Addrs
445 actualNumberOfAdded := len(addressesAdded)
446 expectedNumberOfAdded := 3
447 if actualNumberOfAdded != expectedNumberOfAdded {
448 t.Fatalf("Expecting [%d] addresses to be added, got [%d]: %v", expectedNumberOfAdded, actualNumberOfAdded, addressesAdded)
449 }
450
451 addressesRemoved := (<-mockGetServer.updatesReceived).GetRemove().Addrs
452 actualNumberOfRemoved := len(addressesRemoved)
453 expectedNumberOfRemoved := 1
454 if actualNumberOfRemoved != expectedNumberOfRemoved {
455 t.Fatalf("Expecting [%d] addresses to be removed, got [%d]: %v", expectedNumberOfRemoved, actualNumberOfRemoved, addressesRemoved)
456 }
457
458 sort.Slice(addressesAdded, func(i, j int) bool {
459 return addressesAdded[i].GetAddr().Port < addressesAdded[j].GetAddr().Port
460 })
461 checkAddressAndWeight(t, addressesAdded[0], pod1, defaultWeight)
462 checkAddressAndWeight(t, addressesAdded[1], pod2, defaultWeight)
463 checkAddress(t, addressesRemoved[0], pod3)
464 })
465
466 t.Run("Sends metric labels with added addresses", func(t *testing.T) {
467 mockGetServer, translator := makeEndpointTranslator(t)
468 translator.Start()
469 defer translator.Stop()
470
471 translator.Add(mkAddressSetForPods(t, pod1))
472
473 update := <-mockGetServer.updatesReceived
474
475 actualGlobalMetricLabels := update.GetAdd().MetricLabels
476 expectedGlobalMetricLabels := map[string]string{"namespace": "service-ns", "service": "service-name"}
477 if diff := deep.Equal(actualGlobalMetricLabels, expectedGlobalMetricLabels); diff != nil {
478 t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedGlobalMetricLabels, actualGlobalMetricLabels)
479 }
480
481 actualAddedAddress1MetricLabels := update.GetAdd().Addrs[0].MetricLabels
482 expectedAddedAddress1MetricLabels := map[string]string{
483 "pod": "pod1",
484 "replicationcontroller": "rc-name",
485 "serviceaccount": "serviceaccount-name",
486 "control_plane_ns": "linkerd",
487 "zone": "",
488 }
489 if diff := deep.Equal(actualAddedAddress1MetricLabels, expectedAddedAddress1MetricLabels); diff != nil {
490 t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedAddedAddress1MetricLabels, actualAddedAddress1MetricLabels)
491 }
492 })
493
494 t.Run("Sends TlsIdentity when enabled", func(t *testing.T) {
495 expectedTLSIdentity := &pb.TlsIdentity_DnsLikeIdentity{
496 Name: "serviceaccount-name.ns.serviceaccount.identity.linkerd.trust.domain",
497 }
498
499 mockGetServer, translator := makeEndpointTranslator(t)
500 translator.Start()
501 defer translator.Stop()
502
503 translator.Add(mkAddressSetForPods(t, pod1))
504
505 addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
506 if len(addrs) != 1 {
507 t.Fatalf("Expected [1] address returned, got %v", addrs)
508 }
509
510 actualTLSIdentity := addrs[0].GetTlsIdentity().GetDnsLikeIdentity()
511 if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
512 t.Fatalf("Expected TlsIdentity to be [%v] but was [%v]", expectedTLSIdentity, actualTLSIdentity)
513 }
514 })
515
516 t.Run("Sends Opaque ProtocolHint for opaque ports", func(t *testing.T) {
517 expectedProtocolHint := &pb.ProtocolHint{
518 Protocol: &pb.ProtocolHint_Opaque_{
519 Opaque: &pb.ProtocolHint_Opaque{},
520 },
521 OpaqueTransport: &pb.ProtocolHint_OpaqueTransport{
522 InboundPort: 4143,
523 },
524 }
525
526 mockGetServer, translator := makeEndpointTranslator(t)
527 translator.Start()
528 defer translator.Stop()
529
530 translator.Add(mkAddressSetForServices(podOpaque))
531
532 addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
533 if len(addrs) != 1 {
534 t.Fatalf("Expected [1] address returned, got %v", addrs)
535 }
536
537 actualProtocolHint := addrs[0].GetProtocolHint()
538 if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
539 t.Fatalf("ProtocolHint: %v", diff)
540 }
541 })
542
543 t.Run("Sends IPv6 only when pod has both IPv4 and IPv6", func(t *testing.T) {
544 mockGetServer, translator := makeEndpointTranslator(t)
545 translator.Start()
546 defer translator.Stop()
547
548 translator.Add(mkAddressSetForPods(t, pod1, pod1IPv6))
549
550 addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
551 if len(addrs) != 1 {
552 t.Fatalf("Expected [1] address returned, got %v", addrs)
553 }
554 if ipPort := addr.ProxyAddressToString(addrs[0].GetAddr()); ipPort != "[2001:db8:85a3::8a2e:370:7333]:1" {
555 t.Fatalf("Expected address to be [%s], got [%s]", "[2001:db8:85a3::8a2e:370:7333]:1", ipPort)
556 }
557
558 if updates := len(mockGetServer.updatesReceived); updates > 0 {
559 t.Fatalf("Expected to receive no more messages, received [%d]", updates)
560 }
561 })
562
563 t.Run("Sends IPv4 only when pod has both IPv4 and IPv6 but the latter in another zone ", func(t *testing.T) {
564 mockGetServer, translator := makeEndpointTranslator(t)
565 translator.Start()
566 defer translator.Stop()
567
568 pod1West1a := pod1
569 pod1West1a.ForZones = []v1.ForZone{
570 {Name: "west-1a"},
571 }
572
573 pod1IPv6West1b := pod1IPv6
574 pod1IPv6West1b.ForZones = []v1.ForZone{
575 {Name: "west-1b"},
576 }
577
578 translator.Add(mkAddressSetForPods(t, pod1West1a, pod1IPv6West1b))
579
580 addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
581 if len(addrs) != 1 {
582 t.Fatalf("Expected [1] address returned, got %v", addrs)
583 }
584 if ipPort := addr.ProxyAddressToString(addrs[0].GetAddr()); ipPort != "1.1.1.1:1" {
585 t.Fatalf("Expected address to be [%s], got [%s]", "1.1.1.1:1", ipPort)
586 }
587
588 if updates := len(mockGetServer.updatesReceived); updates > 0 {
589 t.Fatalf("Expected to receive no more messages, received [%d]", updates)
590 }
591 })
592 }
593
594 func TestEndpointTranslatorExternalWorkloads(t *testing.T) {
595 t.Run("Sends one update for add and another for remove", func(t *testing.T) {
596 mockGetServer, translator := makeEndpointTranslator(t)
597 translator.Start()
598 defer translator.Stop()
599
600 translator.Add(mkAddressSetForExternalWorkloads(ew1, ew2))
601 translator.Remove(mkAddressSetForExternalWorkloads(ew2))
602
603 expectedNumUpdates := 2
604 <-mockGetServer.updatesReceived
605 <-mockGetServer.updatesReceived
606
607 if len(mockGetServer.updatesReceived) != 0 {
608 t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
609 }
610 })
611
612 t.Run("Sends addresses as removed or added", func(t *testing.T) {
613 mockGetServer, translator := makeEndpointTranslator(t)
614 translator.Start()
615 defer translator.Stop()
616
617 translator.Add(mkAddressSetForExternalWorkloads(ew1, ew2, ew3))
618 translator.Remove(mkAddressSetForExternalWorkloads(ew3))
619
620 addressesAdded := (<-mockGetServer.updatesReceived).GetAdd().Addrs
621 actualNumberOfAdded := len(addressesAdded)
622 expectedNumberOfAdded := 3
623 if actualNumberOfAdded != expectedNumberOfAdded {
624 t.Fatalf("Expecting [%d] addresses to be added, got [%d]: %v", expectedNumberOfAdded, actualNumberOfAdded, addressesAdded)
625 }
626
627 addressesRemoved := (<-mockGetServer.updatesReceived).GetRemove().Addrs
628 actualNumberOfRemoved := len(addressesRemoved)
629 expectedNumberOfRemoved := 1
630 if actualNumberOfRemoved != expectedNumberOfRemoved {
631 t.Fatalf("Expecting [%d] addresses to be removed, got [%d]: %v", expectedNumberOfRemoved, actualNumberOfRemoved, addressesRemoved)
632 }
633
634 sort.Slice(addressesAdded, func(i, j int) bool {
635 return addressesAdded[i].GetAddr().Port < addressesAdded[j].GetAddr().Port
636 })
637 checkAddressAndWeight(t, addressesAdded[0], ew1, defaultWeight)
638 checkAddressAndWeight(t, addressesAdded[1], ew2, defaultWeight)
639 checkAddress(t, addressesRemoved[0], ew3)
640 })
641
642 t.Run("Sends metric labels with added addresses", func(t *testing.T) {
643 mockGetServer, translator := makeEndpointTranslator(t)
644 translator.Start()
645 defer translator.Stop()
646
647 translator.Add(mkAddressSetForExternalWorkloads(ew1))
648
649 update := <-mockGetServer.updatesReceived
650
651 actualGlobalMetricLabels := update.GetAdd().MetricLabels
652 expectedGlobalMetricLabels := map[string]string{"namespace": "service-ns", "service": "service-name"}
653 if diff := deep.Equal(actualGlobalMetricLabels, expectedGlobalMetricLabels); diff != nil {
654 t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedGlobalMetricLabels, actualGlobalMetricLabels)
655 }
656
657 actualAddedAddress1MetricLabels := update.GetAdd().Addrs[0].MetricLabels
658 expectedAddedAddress1MetricLabels := map[string]string{
659 "external_workload": "ew-1",
660 "zone": "",
661 "workloadgroup": "wg-name",
662 }
663 if diff := deep.Equal(actualAddedAddress1MetricLabels, expectedAddedAddress1MetricLabels); diff != nil {
664 t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedAddedAddress1MetricLabels, actualAddedAddress1MetricLabels)
665 }
666 })
667
668 t.Run("Sends TlsIdentity and Server Name when enabled", func(t *testing.T) {
669 expectedTLSIdentity := &pb.TlsIdentity{
670 Strategy: &pb.TlsIdentity_UriLikeIdentity_{
671 UriLikeIdentity: &pb.TlsIdentity_UriLikeIdentity{
672 Uri: "spiffe://some-domain/ew-1",
673 },
674 },
675 ServerName: &pb.TlsIdentity_DnsLikeIdentity{
676 Name: "server.local",
677 },
678 }
679
680 mockGetServer, translator := makeEndpointTranslator(t)
681 translator.Start()
682 defer translator.Stop()
683
684 translator.Add(mkAddressSetForExternalWorkloads(ew1))
685 addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
686 if len(addrs) != 1 {
687 t.Fatalf("Expected [1] address returned, got %v", addrs)
688 }
689
690 actualTLSIdentity := addrs[0].GetTlsIdentity()
691 if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
692 t.Fatalf("Expected TlsIdentity to be [%v] but was [%v]", expectedTLSIdentity, actualTLSIdentity)
693 }
694 })
695
696 t.Run("Sends Opaque ProtocolHint for opaque ports", func(t *testing.T) {
697 expectedProtocolHint := &pb.ProtocolHint{
698 Protocol: &pb.ProtocolHint_Opaque_{
699 Opaque: &pb.ProtocolHint_Opaque{},
700 },
701 OpaqueTransport: &pb.ProtocolHint_OpaqueTransport{
702 InboundPort: 4143,
703 },
704 }
705
706 mockGetServer, translator := makeEndpointTranslator(t)
707 translator.Start()
708 defer translator.Stop()
709
710 translator.Add(mkAddressSetForExternalWorkloads(ewOpaque))
711
712 addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
713 if len(addrs) != 1 {
714 t.Fatalf("Expected [1] address returned, got %v", addrs)
715 }
716
717 actualProtocolHint := addrs[0].GetProtocolHint()
718 if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
719 t.Fatalf("ProtocolHint: %v", diff)
720 }
721 })
722 }
723
724 func TestEndpointTranslatorTopologyAwareFilter(t *testing.T) {
725 t.Run("Sends one update for add and none for remove", func(t *testing.T) {
726 mockGetServer, translator := makeEndpointTranslator(t)
727 translator.Start()
728 defer translator.Stop()
729
730 translator.Add(mkAddressSetForServices(west1aAddress, west1bAddress))
731 translator.Remove(mkAddressSetForServices(west1bAddress))
732
733
734
735
736 expectedNumUpdates := 1
737 <-mockGetServer.updatesReceived
738
739 if len(mockGetServer.updatesReceived) != 0 {
740 t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
741 }
742 })
743 }
744
745 func TestEndpointTranslatorExperimentalZoneWeights(t *testing.T) {
746 zoneA := "west-1a"
747 zoneB := "west-1b"
748 addrA := watcher.Address{
749 IP: "7.9.7.9",
750 Port: 7979,
751 Zone: &zoneA,
752 }
753 addrB := watcher.Address{
754 IP: "9.7.9.7",
755 Port: 9797,
756 Zone: &zoneB,
757 }
758
759 t.Run("Disabled", func(t *testing.T) {
760 mockGetServer, translator := makeEndpointTranslator(t)
761 translator.extEndpointZoneWeights = false
762 translator.Start()
763 defer translator.Stop()
764
765 translator.Add(mkAddressSetForServices(addrA, addrB))
766
767 addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
768 if len(addrs) != 2 {
769 t.Fatalf("Expected [2] addresses returned, got %v", addrs)
770 }
771 sort.Slice(addrs, func(i, j int) bool {
772 return addrs[i].GetAddr().Port < addrs[j].GetAddr().Port
773 })
774 checkAddressAndWeight(t, addrs[0], addrA, defaultWeight)
775 checkAddressAndWeight(t, addrs[1], addrB, defaultWeight)
776 })
777
778 t.Run("Applies weights", func(t *testing.T) {
779 mockGetServer, translator := makeEndpointTranslator(t)
780 translator.extEndpointZoneWeights = true
781 translator.Start()
782 defer translator.Stop()
783
784 translator.Add(mkAddressSetForServices(addrA, addrB))
785
786 addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
787 if len(addrs) != 2 {
788 t.Fatalf("Expected [2] addresses returned, got %v", addrs)
789 }
790 sort.Slice(addrs, func(i, j int) bool {
791 return addrs[i].GetAddr().Port < addrs[j].GetAddr().Port
792 })
793 checkAddressAndWeight(t, addrs[0], addrA, defaultWeight*10)
794 checkAddressAndWeight(t, addrs[1], addrB, defaultWeight)
795 })
796 }
797
798 func TestEndpointTranslatorForLocalTrafficPolicy(t *testing.T) {
799 t.Run("Sends one update for add and none for remove", func(t *testing.T) {
800 mockGetServer, translator := makeEndpointTranslator(t)
801 translator.Start()
802 defer translator.Stop()
803 addressSet := mkAddressSetForServices(AddressOnTest123Node, AddressNotOnTest123Node)
804 addressSet.LocalTrafficPolicy = true
805 translator.Add(addressSet)
806 translator.Remove(mkAddressSetForServices(AddressNotOnTest123Node))
807
808
809
810
811 expectedNumUpdates := 1
812 <-mockGetServer.updatesReceived
813
814 if len(mockGetServer.updatesReceived) != 0 {
815 t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
816 }
817 })
818
819 t.Run("Removes cannot change LocalTrafficPolicy", func(t *testing.T) {
820 mockGetServer, translator := makeEndpointTranslator(t)
821 translator.Start()
822 defer translator.Stop()
823 addressSet := mkAddressSetForServices(AddressOnTest123Node, AddressNotOnTest123Node)
824 addressSet.LocalTrafficPolicy = true
825 translator.Add(addressSet)
826 set := watcher.AddressSet{
827 Addresses: make(map[watcher.ServiceID]watcher.Address),
828 Labels: map[string]string{"service": "service-name", "namespace": "service-ns"},
829 LocalTrafficPolicy: false,
830 }
831 translator.Remove(set)
832
833
834
835
836 expectedNumUpdates := 1
837 <-mockGetServer.updatesReceived
838
839 if len(mockGetServer.updatesReceived) != 0 {
840 t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
841 }
842 })
843 }
844
845
846 func TestConcurrency(t *testing.T) {
847 _, translator := makeEndpointTranslator(t)
848 translator.Start()
849 defer translator.Stop()
850
851 var wg sync.WaitGroup
852 for i := 0; i < 10; i++ {
853 wg.Add(1)
854 go func() {
855 defer wg.Done()
856 translator.Add(mkAddressSetForServices(west1aAddress, west1bAddress))
857 translator.Remove(mkAddressSetForServices(west1bAddress))
858 }()
859 }
860
861 wg.Wait()
862 }
863
864 func TestGetInboundPort(t *testing.T) {
865 podSpec := &corev1.PodSpec{
866 Containers: []corev1.Container{
867 {
868 Name: k8s.ProxyContainerName,
869 Env: []corev1.EnvVar{
870 {
871 Name: envInboundListenAddr,
872 Value: "1.2.3.4:8080",
873 },
874 },
875 },
876 },
877 }
878
879 port, err := getInboundPort(podSpec)
880 if err != nil {
881 t.Fatalf("Unexpected error: %s", err)
882 }
883 if port != 8080 {
884 t.Fatalf("Expecting port [%d], got [%d]", 8080, port)
885 }
886
887 podSpec.Containers[0].Env[0].Value = "[2001:db8::94]:8080"
888 port, err = getInboundPort(podSpec)
889 if err != nil {
890 t.Fatalf("Unexpected error: %s", err)
891 }
892 if port != 8080 {
893 t.Fatalf("Expecting port [%d], got [%d]", 8080, port)
894 }
895 }
896
897 func mkAddressSetForServices(gatewayAddresses ...watcher.Address) watcher.AddressSet {
898 set := watcher.AddressSet{
899 Addresses: make(map[watcher.ServiceID]watcher.Address),
900 Labels: map[string]string{"service": "service-name", "namespace": "service-ns"},
901 }
902 for _, a := range gatewayAddresses {
903 a := a
904
905 id := watcher.ServiceID{
906 Name: strings.Join([]string{
907 a.IP,
908 fmt.Sprint(a.Port),
909 }, "-"),
910 }
911 set.Addresses[id] = a
912 }
913 return set
914 }
915
916 func mkAddressSetForPods(t *testing.T, podAddresses ...watcher.Address) watcher.AddressSet {
917 t.Helper()
918
919 set := watcher.AddressSet{
920 Addresses: make(map[watcher.PodID]watcher.Address),
921 Labels: map[string]string{"service": "service-name", "namespace": "service-ns"},
922 }
923 for _, p := range podAddresses {
924
925
926 fam := corev1.IPv4Protocol
927 addr, err := netip.ParseAddr(p.IP)
928 if err != nil {
929 t.Fatalf("Invalid IP '%s': %s", p.IP, err)
930 }
931 if addr.Is6() {
932 fam = corev1.IPv6Protocol
933 }
934
935 id := watcher.PodID{
936 Name: p.Pod.Name,
937 Namespace: p.Pod.Namespace,
938 IPFamily: fam,
939 }
940 set.Addresses[id] = p
941 }
942 return set
943 }
944
945 func mkAddressSetForExternalWorkloads(ewAddresses ...watcher.Address) watcher.AddressSet {
946 set := watcher.AddressSet{
947 Addresses: make(map[watcher.PodID]watcher.Address),
948 Labels: map[string]string{"service": "service-name", "namespace": "service-ns"},
949 }
950 for _, ew := range ewAddresses {
951 id := watcher.ExternalWorkloadID{Name: ew.ExternalWorkload.Name, Namespace: ew.ExternalWorkload.Namespace}
952 set.Addresses[id] = ew
953 }
954 return set
955 }
956
957 func checkAddressAndWeight(t *testing.T, actual *pb.WeightedAddr, expected watcher.Address, weight uint32) {
958 t.Helper()
959
960 checkAddress(t, actual.GetAddr(), expected)
961 if actual.GetWeight() != weight {
962 t.Fatalf("Expected weight [%+v] but got [%+v]", weight, actual.GetWeight())
963 }
964 }
965
966 func checkAddress(t *testing.T, actual *net.TcpAddress, expected watcher.Address) {
967 t.Helper()
968
969 expectedAddr, err := addr.ParseProxyIP(expected.IP)
970 expectedTCP := net.TcpAddress{
971 Ip: expectedAddr,
972 Port: expected.Port,
973 }
974 if err != nil {
975 t.Fatalf("Failed to parse expected IP [%s]: %s", expected.IP, err)
976 }
977 if actual.Ip.GetIpv4() == 0 && actual.Ip.GetIpv6() == nil {
978 t.Fatal("Actual IP is empty")
979 }
980 if actual.Ip.GetIpv4() != expectedTCP.Ip.GetIpv4() {
981 t.Fatalf("Expected IPv4 [%+v] but got [%+v]", expectedTCP.Ip, actual.Ip)
982 }
983 if !proto.Equal(actual.Ip.GetIpv6(), expectedTCP.Ip.GetIpv6()) {
984 t.Fatalf("Expected IPv6 [%+v] but got [%+v]", expectedTCP.Ip, actual.Ip)
985 }
986 if actual.Port != expectedTCP.Port {
987 t.Fatalf("Expected port [%+v] but got [%+v]", expectedTCP.Port, actual.Port)
988 }
989 }
990
View as plain text