1 package destination
2
3 import (
4 "context"
5 "fmt"
6 gonet "net"
7 "net/netip"
8 "reflect"
9 "testing"
10 "time"
11
12 "github.com/golang/protobuf/ptypes/duration"
13 pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
14 "github.com/linkerd/linkerd2-proxy-api/go/net"
15 "github.com/linkerd/linkerd2/controller/api/destination/watcher"
16 "github.com/linkerd/linkerd2/controller/api/util"
17 "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta2"
18 "github.com/linkerd/linkerd2/controller/k8s"
19 "github.com/linkerd/linkerd2/pkg/addr"
20 pkgk8s "github.com/linkerd/linkerd2/pkg/k8s"
21 "github.com/linkerd/linkerd2/testutil"
22 logging "github.com/sirupsen/logrus"
23 "google.golang.org/grpc/codes"
24 "google.golang.org/grpc/status"
25 corev1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/util/intstr"
28 )
29
30 const fullyQualifiedName = "name1.ns.svc.mycluster.local"
31 const fullyQualifiedNameIPv6 = "name-ipv6.ns.svc.mycluster.local"
32 const fullyQualifiedNameDual = "name-ds.ns.svc.mycluster.local"
33 const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
34 const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
35 const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
36 const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
37 const clusterIP = "172.17.12.0"
38 const clusterIPv6 = "2001:db8::88"
39 const clusterIPOpaque = "172.17.12.1"
40 const podIP1 = "172.17.0.12"
41 const podIP1v6 = "2001:db8::68"
42 const podIPv6Dual = "2001:db8::94"
43 const podIP2 = "172.17.0.13"
44 const podIPOpaque = "172.17.0.14"
45 const podIPSkipped = "172.17.0.15"
46 const podIPPolicy = "172.17.0.16"
47 const podIPStatefulSet = "172.17.13.15"
48 const externalIP = "192.168.1.20"
49 const externalIPv6 = "2001:db8::78"
50 const externalWorkloadIP = "200.1.1.1"
51 const externalWorkloadIPPolicy = "200.1.1.2"
52 const port uint32 = 8989
53 const opaquePort uint32 = 4242
54 const skippedPort uint32 = 24224
55
56 func TestGet(t *testing.T) {
57 t.Run("Returns error if not valid service name", func(t *testing.T) {
58 server := makeServer(t)
59 defer server.clusterStore.UnregisterGauges()
60
61 stream := &bufferingGetStream{
62 updates: make(chan *pb.Update, 50),
63 MockServerStream: util.NewMockServerStream(),
64 }
65
66 err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: "linkerd.io"}, stream)
67 if err == nil {
68 t.Fatalf("Expecting error, got nothing")
69 }
70 })
71
72 t.Run("Returns InvalidArgument for ExternalName service", func(t *testing.T) {
73 server := makeServer(t)
74 defer server.clusterStore.UnregisterGauges()
75
76 stream := &bufferingGetStream{
77 updates: make(chan *pb.Update, 50),
78 MockServerStream: util.NewMockServerStream(),
79 }
80
81 err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: "externalname.ns.svc.cluster.local"}, stream)
82
83 code := status.Code(err)
84 if code != codes.InvalidArgument {
85 t.Fatalf("Expected InvalidArgument, got %s", code)
86 }
87 })
88
89 t.Run("Returns endpoints (IPv4)", func(t *testing.T) {
90 testReturnEndpoints(t, fullyQualifiedName, podIP1, port)
91 })
92
93 t.Run("Returns endpoints (IPv6)", func(t *testing.T) {
94 testReturnEndpoints(t, fullyQualifiedNameIPv6, podIP1v6, port)
95 })
96
97 t.Run("Returns endpoints (dual-stack)", func(t *testing.T) {
98 testReturnEndpoints(t, fullyQualifiedNameDual, podIPv6Dual, port)
99 })
100
101 t.Run("Sets meshed HTTP/2 client params", func(t *testing.T) {
102 server := makeServer(t)
103 http2Params := pb.Http2ClientParams{
104 KeepAlive: &pb.Http2ClientParams_KeepAlive{
105 Timeout: &duration.Duration{Seconds: 10},
106 Interval: &duration.Duration{Seconds: 20},
107 },
108 }
109 server.config.MeshedHttp2ClientParams = &http2Params
110 defer server.clusterStore.UnregisterGauges()
111
112 stream := &bufferingGetStream{
113 updates: make(chan *pb.Update, 50),
114 MockServerStream: util.NewMockServerStream(),
115 }
116 defer stream.Cancel()
117 errs := make(chan error)
118
119
120
121 go func() {
122 err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", fullyQualifiedName, port)}, stream)
123 if err != nil {
124 errs <- err
125 }
126 }()
127
128 select {
129 case update := <-stream.updates:
130 add, ok := update.GetUpdate().(*pb.Update_Add)
131 if !ok {
132 t.Fatalf("Update expected to be an add, but was %+v", update)
133 }
134 addr := add.Add.Addrs[0]
135 if !reflect.DeepEqual(addr.GetHttp2(), &http2Params) {
136 t.Fatalf("Expected HTTP/2 client params to be %v, but got %v", &http2Params, addr.GetHttp2())
137 }
138 case err := <-errs:
139 t.Fatalf("Got error: %s", err)
140 }
141 })
142
143 t.Run("Does not set unmeshed HTTP/2 client params", func(t *testing.T) {
144 server := makeServer(t)
145 http2Params := pb.Http2ClientParams{
146 KeepAlive: &pb.Http2ClientParams_KeepAlive{
147 Timeout: &duration.Duration{Seconds: 10},
148 Interval: &duration.Duration{Seconds: 20},
149 },
150 }
151 server.config.MeshedHttp2ClientParams = &http2Params
152 defer server.clusterStore.UnregisterGauges()
153
154 stream := &bufferingGetStream{
155 updates: make(chan *pb.Update, 50),
156 MockServerStream: util.NewMockServerStream(),
157 }
158 defer stream.Cancel()
159 errs := make(chan error)
160
161
162
163 go func() {
164 err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", "name2.ns.svc.mycluster.local", port)}, stream)
165 if err != nil {
166 errs <- err
167 }
168 }()
169
170 select {
171 case update := <-stream.updates:
172 add, ok := update.GetUpdate().(*pb.Update_Add)
173 if !ok {
174 t.Fatalf("Update expected to be an add, but was %+v", update)
175 }
176 addr := add.Add.Addrs[0]
177 if addr.GetHttp2() != nil {
178 t.Fatalf("Expected HTTP/2 client params to be nil, but got %v", addr.GetHttp2())
179 }
180 case err := <-errs:
181 t.Fatalf("Got error: %s", err)
182 }
183 })
184
185 t.Run("Return endpoint with unknown protocol hint and identity when service name contains skipped inbound port", func(t *testing.T) {
186 server := makeServer(t)
187 defer server.clusterStore.UnregisterGauges()
188
189 stream := &bufferingGetStream{
190 updates: make(chan *pb.Update, 50),
191 MockServerStream: util.NewMockServerStream(),
192 }
193 defer stream.Cancel()
194 errs := make(chan error)
195
196 path := fmt.Sprintf("%s:%d", fullyQualifiedNameSkipped, skippedPort)
197
198
199
200 go func() {
201 err := server.Get(&pb.GetDestination{
202 Scheme: "k8s",
203 Path: path,
204 }, stream)
205 if err != nil {
206 errs <- err
207 }
208 }()
209
210 select {
211 case update := <-stream.updates:
212 addrs := update.GetAdd().Addrs
213 if len(addrs) == 0 {
214 t.Fatalf("Expected len(addrs) to be > 0")
215 }
216
217 if addrs[0].GetProtocolHint().GetProtocol() != nil || addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
218 t.Fatalf("Expected protocol hint for %s to be nil but got %+v", path, addrs[0].ProtocolHint)
219 }
220
221 if addrs[0].TlsIdentity != nil {
222 t.Fatalf("Expected TLS identity for %s to be nil but got %+v", path, addrs[0].TlsIdentity)
223 }
224 case err := <-errs:
225 t.Fatalf("Got error: %s", err)
226 }
227 })
228
229 t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) {
230 testOpaque(t, "policy-test")
231 })
232
233 t.Run("Return endpoint opaque protocol controlled by a server (native sidecar)", func(t *testing.T) {
234 testOpaque(t, "native")
235 })
236
237 t.Run("Remote discovery", func(t *testing.T) {
238 server := makeServer(t)
239 defer server.clusterStore.UnregisterGauges()
240
241
242 time.Sleep(50 * time.Millisecond)
243
244 stream := &bufferingGetStream{
245 updates: make(chan *pb.Update, 50),
246 MockServerStream: util.NewMockServerStream(),
247 }
248 defer stream.Cancel()
249 errs := make(chan error)
250
251
252
253 go func() {
254 err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", "foo-target.ns.svc.mycluster.local", 80)}, stream)
255 if err != nil {
256 errs <- err
257 }
258 }()
259
260 select {
261 case update := <-stream.updates:
262 if updateAddAddress(t, update)[0] != fmt.Sprintf("%s:%d", "172.17.55.1", 80) {
263 t.Fatalf("Expected %s but got %s", fmt.Sprintf("%s:%d", podIP1, port), updateAddAddress(t, update)[0])
264 }
265
266 if len(stream.updates) != 0 {
267 t.Fatalf("Expected 1 update but got %d: %v", 1+len(stream.updates), stream.updates)
268 }
269
270 case err := <-errs:
271 t.Fatalf("Got error: %s", err)
272 }
273 })
274 }
275
276 func testOpaque(t *testing.T, name string) {
277 server, client := getServerWithClient(t)
278 defer server.clusterStore.UnregisterGauges()
279
280 stream := &bufferingGetStream{
281 updates: make(chan *pb.Update, 50),
282 MockServerStream: util.NewMockServerStream(),
283 }
284 defer stream.Cancel()
285 errs := make(chan error)
286
287 path := fmt.Sprintf("%s.ns.svc.mycluster.local:%d", name, 80)
288
289
290
291 go func() {
292 err := server.Get(&pb.GetDestination{
293 Scheme: "k8s",
294 Path: path,
295 }, stream)
296 if err != nil {
297 errs <- err
298 }
299 }()
300
301 select {
302 case err := <-errs:
303 t.Fatalf("Got error: %s", err)
304 case update := <-stream.updates:
305 addrs := update.GetAdd().Addrs
306 if len(addrs) == 0 {
307 t.Fatalf("Expected len(addrs) to be > 0")
308 }
309
310 if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
311 t.Fatalf("Expected opaque transport for %s but was nil", path)
312 }
313 }
314
315
316
317
318 srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), name, metav1.GetOptions{})
319 if err != nil {
320 t.Fatal(err)
321 }
322
323 srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
324 _, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
325 if err != nil {
326 t.Fatal(err)
327 }
328
329 select {
330 case update := <-stream.updates:
331 addrs := update.GetAdd().Addrs
332 if len(addrs) == 0 {
333 t.Fatalf("Expected len(addrs) to be > 0")
334 }
335
336 if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
337 t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
338 }
339 case err := <-errs:
340 t.Fatalf("Got error: %s", err)
341 }
342
343
344
345
346 srv.Spec.PodSelector.MatchLabels = map[string]string{"app": name}
347
348 _, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
349 if err != nil {
350 t.Fatal(err)
351 }
352
353 select {
354 case update := <-stream.updates:
355 addrs := update.GetAdd().Addrs
356 if len(addrs) == 0 {
357 t.Fatalf("Expected len(addrs) to be > 0")
358 }
359
360 if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
361 t.Fatalf("Expected opaque transport for %s but was nil", path)
362 }
363 case err := <-errs:
364 t.Fatalf("Got error: %s", err)
365 }
366 }
367
368 func TestGetProfiles(t *testing.T) {
369 t.Run("Returns error if not valid service name", func(t *testing.T) {
370 server := makeServer(t)
371 defer server.clusterStore.UnregisterGauges()
372
373 stream := &bufferingGetProfileStream{
374 updates: []*pb.DestinationProfile{},
375 MockServerStream: util.NewMockServerStream(),
376 }
377 defer stream.Cancel()
378 err := server.GetProfile(&pb.GetDestination{Scheme: "k8s", Path: "linkerd.io"}, stream)
379 if err == nil {
380 t.Fatalf("Expecting error, got nothing")
381 }
382 })
383
384 t.Run("Returns InvalidArgument for ExternalName service", func(t *testing.T) {
385 server := makeServer(t)
386 defer server.clusterStore.UnregisterGauges()
387
388 stream := &bufferingGetProfileStream{
389 updates: []*pb.DestinationProfile{},
390 MockServerStream: util.NewMockServerStream(),
391 }
392 defer stream.Cancel()
393
394 err := server.GetProfile(&pb.GetDestination{Scheme: "k8s", Path: "externalname.ns.svc.cluster.local"}, stream)
395 code := status.Code(err)
396 if code != codes.InvalidArgument {
397 t.Fatalf("Expected InvalidArgument, got %s", code)
398 }
399 })
400
401 t.Run("Returns server profile", func(t *testing.T) {
402 server := makeServer(t)
403 defer server.clusterStore.UnregisterGauges()
404
405 stream := profileStream(t, server, fullyQualifiedName, port, "ns:other")
406 defer stream.Cancel()
407 profile := assertSingleProfile(t, stream.Updates())
408 if profile.FullyQualifiedName != fullyQualifiedName {
409 t.Fatalf("Expected fully qualified name '%s', but got '%s'",
410 fullyQualifiedName, profile.FullyQualifiedName)
411 }
412 if profile.OpaqueProtocol {
413 t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
414 }
415 routes := profile.GetRoutes()
416 if len(routes) != 1 {
417 t.Fatalf("Expected 0 routes but got %d: %v", len(routes), routes)
418 }
419 })
420
421 t.Run("Return service profile when using json token", func(t *testing.T) {
422 server := makeServer(t)
423 defer server.clusterStore.UnregisterGauges()
424
425 stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"other"}`)
426 defer stream.Cancel()
427 profile := assertSingleProfile(t, stream.Updates())
428 if profile.FullyQualifiedName != fullyQualifiedName {
429 t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName)
430 }
431 routes := profile.GetRoutes()
432 if len(routes) != 1 {
433 t.Fatalf("Expected 1 route got %d: %v", len(routes), routes)
434 }
435 })
436
437 t.Run("Returns client profile", func(t *testing.T) {
438 server := makeServer(t)
439 defer server.clusterStore.UnregisterGauges()
440
441 stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"client-ns"}`)
442 defer stream.Cancel()
443 profile := assertSingleProfile(t, stream.Updates())
444 routes := profile.GetRoutes()
445 if len(routes) != 1 {
446 t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
447 }
448 if !routes[0].GetIsRetryable() {
449 t.Fatalf("Expected route to be retryable, but it was not")
450 }
451 })
452
453 t.Run("Return profile when using cluster IP", func(t *testing.T) {
454 server := makeServer(t)
455 defer server.clusterStore.UnregisterGauges()
456
457 stream := profileStream(t, server, clusterIP, port, "")
458 defer stream.Cancel()
459 profile := assertSingleProfile(t, stream.Updates())
460 if profile.FullyQualifiedName != fullyQualifiedName {
461 t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName)
462 }
463 if profile.OpaqueProtocol {
464 t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
465 }
466 routes := profile.GetRoutes()
467 if len(routes) != 1 {
468 t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
469 }
470 })
471
472 t.Run("Return profile when using secondary cluster IP", func(t *testing.T) {
473 server := makeServer(t)
474 defer server.clusterStore.UnregisterGauges()
475
476 stream := profileStream(t, server, clusterIPv6, port, "")
477 defer stream.Cancel()
478 profile := assertSingleProfile(t, stream.Updates())
479 if profile.FullyQualifiedName != fullyQualifiedNameDual {
480 t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName)
481 }
482 if profile.OpaqueProtocol {
483 t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
484 }
485 routes := profile.GetRoutes()
486 if len(routes) != 1 {
487 t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
488 }
489 })
490
491 t.Run("Return profile with endpoint when using pod DNS", func(t *testing.T) {
492 server := makeServer(t)
493 defer server.clusterStore.UnregisterGauges()
494
495 stream := profileStream(t, server, fullyQualifiedPodDNS, port, "ns:ns")
496 defer stream.Cancel()
497
498 epAddr, err := toAddress(podIPStatefulSet, port)
499 if err != nil {
500 t.Fatalf("Got error: %s", err)
501 }
502
503
504
505 updates := stream.Updates()
506 if len(updates) == 0 || len(updates) > 3 {
507 t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
508 }
509
510 first := updates[0]
511 if first.Endpoint == nil {
512 t.Fatalf("Expected response to have endpoint field")
513 }
514 if first.OpaqueProtocol {
515 t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
516 }
517 _, exists := first.Endpoint.MetricLabels["namespace"]
518 if !exists {
519 t.Fatalf("Expected 'namespace' metric label to exist but it did not")
520 }
521 if first.GetEndpoint().GetProtocolHint() == nil {
522 t.Fatalf("Expected protocol hint but found none")
523 }
524 if first.GetEndpoint().GetProtocolHint().GetOpaqueTransport() != nil {
525 t.Fatalf("Expected pod to not support opaque traffic on port %d", port)
526 }
527 if first.Endpoint.Addr.Ip.GetIpv4() == 0 && first.Endpoint.Addr.Ip.GetIpv6() == nil {
528 t.Fatal("IP is empty")
529 }
530 if first.Endpoint.Addr.String() != epAddr.String() {
531 t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip)
532 }
533 })
534
535 t.Run("Return profile with endpoint when using pod IP", func(t *testing.T) {
536 server := makeServer(t)
537 http2Params := pb.Http2ClientParams{
538 KeepAlive: &pb.Http2ClientParams_KeepAlive{
539 Timeout: &duration.Duration{Seconds: 10},
540 Interval: &duration.Duration{Seconds: 20},
541 },
542 }
543 server.config.MeshedHttp2ClientParams = &http2Params
544 defer server.clusterStore.UnregisterGauges()
545
546 stream := profileStream(t, server, podIP1, port, "ns:ns")
547 defer stream.Cancel()
548
549 epAddr, err := toAddress(podIP1, port)
550 if err != nil {
551 t.Fatalf("Got error: %s", err)
552 }
553
554
555
556 updates := stream.Updates()
557 if len(updates) == 0 || len(updates) > 3 {
558 t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
559 }
560
561 first := updates[0]
562 if first.Endpoint == nil {
563 t.Fatalf("Expected response to have endpoint field")
564 }
565 if first.OpaqueProtocol {
566 t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
567 }
568 _, exists := first.Endpoint.MetricLabels["namespace"]
569 if !exists {
570 t.Fatalf("Expected 'namespace' metric label to exist but it did not")
571 }
572 if first.GetEndpoint().GetProtocolHint() == nil {
573 t.Fatalf("Expected protocol hint but found none")
574 }
575 if first.GetEndpoint().GetProtocolHint().GetOpaqueTransport() != nil {
576 t.Fatalf("Expected pod to not support opaque traffic on port %d", port)
577 }
578 if first.Endpoint.Addr.Ip.GetIpv4() == 0 && first.Endpoint.Addr.Ip.GetIpv6() == nil {
579 t.Fatal("IP is empty")
580 }
581 if first.Endpoint.Addr.String() != epAddr.String() {
582 t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip)
583 }
584 if !reflect.DeepEqual(first.Endpoint.GetHttp2(), &http2Params) {
585 t.Fatalf("Expected HTTP/2 client params to be %v, but got %v", &http2Params, first.Endpoint.GetHttp2())
586 }
587 })
588
589 t.Run("Return profile with endpoint when using pod secondary IP", func(t *testing.T) {
590 server := makeServer(t)
591 defer server.clusterStore.UnregisterGauges()
592
593 stream := profileStream(t, server, podIPv6Dual, port, "ns:ns")
594 defer stream.Cancel()
595
596 epAddr, err := toAddress(podIPv6Dual, port)
597 if err != nil {
598 t.Fatalf("Got error: %s", err)
599 }
600
601
602
603 updates := stream.Updates()
604 if len(updates) == 0 || len(updates) > 3 {
605 t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
606 }
607
608 first := updates[0]
609 if first.Endpoint == nil {
610 t.Fatalf("Expected response to have endpoint field")
611 }
612 if first.OpaqueProtocol {
613 t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
614 }
615 _, exists := first.Endpoint.MetricLabels["namespace"]
616 if !exists {
617 t.Fatalf("Expected 'namespace' metric label to exist but it did not")
618 }
619 if first.GetEndpoint().GetProtocolHint() == nil {
620 t.Fatalf("Expected protocol hint but found none")
621 }
622 if first.GetEndpoint().GetProtocolHint().GetOpaqueTransport() != nil {
623 t.Fatalf("Expected pod to not support opaque traffic on port %d", port)
624 }
625 if first.Endpoint.Addr.Ip.GetIpv4() == 0 && first.Endpoint.Addr.Ip.GetIpv6() == nil {
626 t.Fatal("IP is empty")
627 }
628 if first.Endpoint.Addr.String() != epAddr.String() {
629 t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip)
630 }
631 })
632
633 t.Run("Return profile with endpoint when using externalworkload IP", func(t *testing.T) {
634 server := makeServer(t)
635 http2Params := pb.Http2ClientParams{
636 KeepAlive: &pb.Http2ClientParams_KeepAlive{
637 Timeout: &duration.Duration{Seconds: 10},
638 Interval: &duration.Duration{Seconds: 20},
639 },
640 }
641 server.config.MeshedHttp2ClientParams = &http2Params
642 defer server.clusterStore.UnregisterGauges()
643
644 stream := profileStream(t, server, externalWorkloadIP, port, "ns:ns")
645 defer stream.Cancel()
646
647 epAddr, err := toAddress(externalWorkloadIP, port)
648 if err != nil {
649 t.Fatalf("Got error: %s", err)
650 }
651
652
653
654 updates := stream.Updates()
655 if len(updates) == 0 || len(updates) > 3 {
656 t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
657 }
658
659 first := updates[0]
660 if first.Endpoint == nil {
661 t.Fatalf("Expected response to have endpoint field")
662 }
663 if first.OpaqueProtocol {
664 t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
665 }
666 _, exists := first.Endpoint.MetricLabels["namespace"]
667 if !exists {
668 t.Fatalf("Expected 'namespace' metric label to exist but it did not %v", first.Endpoint)
669 }
670 if first.GetEndpoint().GetProtocolHint() == nil {
671 t.Fatalf("Expected protocol hint but found none")
672 }
673 if first.GetEndpoint().GetProtocolHint().GetOpaqueTransport() != nil {
674 t.Fatalf("Expected externalworkload to not support opaque traffic on port %d", port)
675 }
676 if first.Endpoint.Addr.Ip.GetIpv4() == 0 && first.Endpoint.Addr.Ip.GetIpv6() == nil {
677 t.Fatal("IP is empty")
678 }
679 if first.Endpoint.Addr.String() != epAddr.String() {
680 t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip)
681 }
682 if !reflect.DeepEqual(first.Endpoint.GetHttp2(), &http2Params) {
683 t.Fatalf("Expected HTTP/2 client params to be %v, but got %v", &http2Params, first.Endpoint.GetHttp2())
684 }
685 })
686
687 t.Run("Return default profile when IP does not map to service or pod", func(t *testing.T) {
688 server := makeServer(t)
689 defer server.clusterStore.UnregisterGauges()
690
691 stream := profileStream(t, server, "172.0.0.0", 1234, "")
692 defer stream.Cancel()
693 profile := assertSingleProfile(t, stream.Updates())
694 if profile.RetryBudget == nil {
695 t.Fatalf("Expected default profile to have a retry budget")
696 }
697 })
698
699 t.Run("Return profile with no opaque transport when pod does not have label and port is opaque", func(t *testing.T) {
700 server := makeServer(t)
701 defer server.clusterStore.UnregisterGauges()
702
703
704 stream := profileStream(t, server, podIP2, 3306, "")
705 defer stream.Cancel()
706 profile := assertSingleProfile(t, stream.Updates())
707 if profile.Endpoint == nil {
708 t.Fatalf("Expected response to have endpoint field")
709 }
710
711 if profile.Endpoint.GetProtocolHint().GetOpaqueTransport() != nil {
712 t.Fatalf("Expected no opaque transport but found one")
713 }
714 if profile.GetEndpoint().GetHttp2() != nil {
715 t.Fatalf("Expected no HTTP/2 client parameters but found one")
716 }
717 })
718
719 t.Run("Return profile with no protocol hint when pod does not have label", func(t *testing.T) {
720 server := makeServer(t)
721 defer server.clusterStore.UnregisterGauges()
722
723 stream := profileStream(t, server, podIP2, port, "")
724 defer stream.Cancel()
725 profile := assertSingleProfile(t, stream.Updates())
726 if profile.Endpoint == nil {
727 t.Fatalf("Expected response to have endpoint field")
728 }
729 if profile.Endpoint.GetProtocolHint().GetProtocol() != nil || profile.Endpoint.GetProtocolHint().GetOpaqueTransport() != nil {
730 t.Fatalf("Expected no protocol hint but found one")
731 }
732 })
733
734 t.Run("Return profile with protocol hint for default opaque port when pod is unmeshed", func(t *testing.T) {
735 server := makeServer(t)
736 defer server.clusterStore.UnregisterGauges()
737
738
739 stream := profileStream(t, server, podIP2, 3306, "")
740 defer stream.Cancel()
741 profile := assertSingleProfile(t, stream.Updates())
742 if profile.Endpoint == nil {
743 t.Fatalf("Expected response to have endpoint field")
744 }
745 if !profile.OpaqueProtocol {
746 t.Fatal("Expected port 3306 to be an opaque protocol, but it was not")
747 }
748 if profile.GetEndpoint().GetProtocolHint() != nil {
749 t.Fatalf("Expected protocol hint to be nil")
750 }
751 })
752
753 t.Run("Return non-opaque protocol profile when using cluster IP and opaque protocol port", func(t *testing.T) {
754 server := makeServer(t)
755 defer server.clusterStore.UnregisterGauges()
756
757 stream := profileStream(t, server, clusterIPOpaque, opaquePort, "")
758 defer stream.Cancel()
759 profile := assertSingleProfile(t, stream.Updates())
760 if profile.FullyQualifiedName != fullyQualifiedNameOpaque {
761 t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaque, profile.FullyQualifiedName)
762 }
763 if profile.OpaqueProtocol {
764 t.Fatalf("Expected port %d to not be an opaque protocol, but it was", opaquePort)
765 }
766 })
767
768 t.Run("Return opaque protocol profile with endpoint when using pod IP and opaque protocol port", func(t *testing.T) {
769 server := makeServer(t)
770 defer server.clusterStore.UnregisterGauges()
771
772 stream := profileStream(t, server, podIPOpaque, opaquePort, "")
773 defer stream.Cancel()
774
775 epAddr, err := toAddress(podIPOpaque, opaquePort)
776 if err != nil {
777 t.Fatalf("Got error: %s", err)
778 }
779
780
781
782 updates := stream.Updates()
783 if len(updates) == 0 || len(updates) > 3 {
784 t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
785 }
786
787 profile := assertSingleProfile(t, updates)
788 if profile.Endpoint == nil {
789 t.Fatalf("Expected response to have endpoint field")
790 }
791 if !profile.OpaqueProtocol {
792 t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort)
793 }
794 _, exists := profile.Endpoint.MetricLabels["namespace"]
795 if !exists {
796 t.Fatalf("Expected 'namespace' metric label to exist but it did not")
797 }
798 if profile.Endpoint.ProtocolHint == nil {
799 t.Fatalf("Expected protocol hint but found none")
800 }
801 if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 {
802 t.Fatalf("Expected pod to support opaque traffic on port 4143")
803 }
804 if profile.Endpoint.Addr.Ip.GetIpv4() == 0 && profile.Endpoint.Addr.Ip.GetIpv6() == nil {
805 t.Fatal("IP is empty")
806 }
807 if profile.Endpoint.Addr.String() != epAddr.String() {
808 t.Fatalf("Expected endpoint IP port to be %d, but it was %d", epAddr.Port, profile.Endpoint.Addr.Port)
809 }
810 })
811
812 t.Run("Return opaque protocol profile when using service name with opaque port annotation", func(t *testing.T) {
813 server := makeServer(t)
814 defer server.clusterStore.UnregisterGauges()
815
816 stream := profileStream(t, server, fullyQualifiedNameOpaqueService, opaquePort, "")
817 defer stream.Cancel()
818 profile := assertSingleProfile(t, stream.Updates())
819 if profile.FullyQualifiedName != fullyQualifiedNameOpaqueService {
820 t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaqueService, profile.FullyQualifiedName)
821 }
822 if !profile.OpaqueProtocol {
823 t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort)
824 }
825 })
826
827 t.Run("Return profile with unknown protocol hint and identity when pod contains skipped inbound port", func(t *testing.T) {
828 server := makeServer(t)
829 defer server.clusterStore.UnregisterGauges()
830
831 stream := profileStream(t, server, podIPSkipped, skippedPort, "")
832 defer stream.Cancel()
833 profile := assertSingleProfile(t, stream.Updates())
834 addr := profile.GetEndpoint()
835 if addr == nil {
836 t.Fatalf("Expected to not be nil")
837 }
838 if addr.GetProtocolHint().GetProtocol() != nil || addr.GetProtocolHint().GetOpaqueTransport() != nil {
839 t.Fatalf("Expected protocol hint for %s to be nil but got %+v", podIPSkipped, addr.ProtocolHint)
840 }
841 if addr.TlsIdentity != nil {
842 t.Fatalf("Expected TLS identity for %s to be nil but got %+v", podIPSkipped, addr.TlsIdentity)
843 }
844 })
845
846 t.Run("Return opaque protocol profile with endpoint when using externalworkload IP and opaque protocol port", func(t *testing.T) {
847 server := makeServer(t)
848 defer server.clusterStore.UnregisterGauges()
849
850 stream := profileStream(t, server, externalWorkloadIP, opaquePort, "")
851 defer stream.Cancel()
852
853 epAddr, err := toAddress(externalWorkloadIP, opaquePort)
854 if err != nil {
855 t.Fatalf("Got error: %s", err)
856 }
857
858
859
860 updates := stream.Updates()
861 if len(updates) == 0 || len(updates) > 3 {
862 t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
863 }
864
865 profile := assertSingleProfile(t, updates)
866 if profile.Endpoint == nil {
867 t.Fatalf("Expected response to have endpoint field")
868 }
869 if !profile.OpaqueProtocol {
870 t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort)
871 }
872 _, exists := profile.Endpoint.MetricLabels["namespace"]
873 if !exists {
874 t.Fatalf("Expected 'namespace' metric label to exist but it did not")
875 }
876 if profile.Endpoint.ProtocolHint == nil {
877 t.Fatalf("Expected protocol hint but found none")
878 }
879 if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 {
880 t.Fatalf("Expected pod to support opaque traffic on port 4143")
881 }
882 if profile.Endpoint.Addr.Ip.GetIpv4() == 0 && profile.Endpoint.Addr.Ip.GetIpv6() == nil {
883 t.Fatal("IP is empty")
884 }
885 if profile.Endpoint.Addr.String() != epAddr.String() {
886 t.Fatalf("Expected endpoint IP port to be %d, but it was %d", epAddr.Port, profile.Endpoint.Addr.Port)
887 }
888 })
889
890 t.Run("Return profile with opaque protocol when using Pod IP selected by a Server", func(t *testing.T) {
891 server := makeServer(t)
892 defer server.clusterStore.UnregisterGauges()
893
894 stream := profileStream(t, server, podIPPolicy, 80, "")
895 defer stream.Cancel()
896 profile := assertSingleProfile(t, stream.Updates())
897 if profile.Endpoint == nil {
898 t.Fatalf("Expected response to have endpoint field")
899 }
900 if !profile.OpaqueProtocol {
901 t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 80)
902 }
903 if profile.Endpoint.GetProtocolHint() == nil {
904 t.Fatalf("Expected protocol hint but found none")
905 }
906 if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 {
907 t.Fatalf("Expected pod to support opaque traffic on port 4143")
908 }
909 })
910
911 t.Run("Return profile with opaque protocol when using externalworkload IP selected by a Server", func(t *testing.T) {
912 server := makeServer(t)
913 defer server.clusterStore.UnregisterGauges()
914
915 stream := profileStream(t, server, externalWorkloadIPPolicy, 80, "")
916 defer stream.Cancel()
917 profile := assertSingleProfile(t, stream.Updates())
918 if profile.Endpoint == nil {
919 t.Fatalf("Expected response to have endpoint field")
920 }
921 if !profile.OpaqueProtocol {
922 t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 80)
923 }
924 if profile.Endpoint.GetProtocolHint() == nil {
925 t.Fatalf("Expected protocol hint but found none")
926 }
927 if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 {
928 t.Fatalf("Expected pod to support opaque traffic on port 4143")
929 }
930 })
931
932 t.Run("Return profile with opaque protocol when using an opaque port with an external IP", func(t *testing.T) {
933 server := makeServer(t)
934 defer server.clusterStore.UnregisterGauges()
935
936 stream := profileStream(t, server, externalIP, 3306, "")
937 defer stream.Cancel()
938 profile := assertSingleProfile(t, stream.Updates())
939 if !profile.OpaqueProtocol {
940 t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 3306)
941 }
942
943 })
944
945 t.Run("Return profile with non-opaque protocol when using an arbitrary port with an external IP", func(t *testing.T) {
946 server := makeServer(t)
947 defer server.clusterStore.UnregisterGauges()
948
949 stream := profileStream(t, server, externalIP, 80, "")
950 defer stream.Cancel()
951 profile := assertSingleProfile(t, stream.Updates())
952 if profile.OpaqueProtocol {
953 t.Fatalf("Expected port %d to be a non-opaque protocol, but it was opaque", 80)
954 }
955 })
956
957 t.Run("Return profile for host port pods", func(t *testing.T) {
958 hostPort := uint32(7777)
959 containerPort := uint32(80)
960 server, l5dClient := getServerWithClient(t)
961 defer server.clusterStore.UnregisterGauges()
962
963 stream := profileStream(t, server, externalIP, hostPort, "")
964 defer stream.Cancel()
965
966
967 profile := assertSingleProfile(t, stream.Updates())
968 dstPod := profile.Endpoint.MetricLabels["pod"]
969 if dstPod != "hostport-mapping" {
970 t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping", dstPod)
971 }
972
973 ip, err := addr.ParseProxyIP(externalIP)
974 if err != nil {
975 t.Fatalf("Error parsing IP: %s", err)
976 }
977 addr := profile.Endpoint.Addr
978 if addr.Ip.String() != ip.String() && addr.Port != hostPort {
979 t.Fatalf("Expected endpoint addr to be %s port:%d got %s", ip, hostPort, addr)
980 }
981
982
983 err = server.k8sAPI.Client.CoreV1().Pods("ns").Delete(context.Background(), "hostport-mapping", metav1.DeleteOptions{})
984 if err != nil {
985 t.Fatalf("Failed to delete pod: %s", err)
986 }
987 err = testutil.RetryFor(time.Second*10, func() error {
988 updates := stream.Updates()
989 if len(updates) < 2 {
990 return fmt.Errorf("expected 2 updates, got %d", len(updates))
991 }
992 return nil
993 })
994 if err != nil {
995 t.Fatal(err)
996 }
997 profile = stream.Updates()[1]
998 dstPod = profile.Endpoint.MetricLabels["pod"]
999 if dstPod != "" {
1000 t.Fatalf("Expected no dst_pod but got %s", dstPod)
1001 }
1002
1003
1004 _, err = server.k8sAPI.Client.CoreV1().Pods("ns").Create(context.Background(), &corev1.Pod{
1005 ObjectMeta: metav1.ObjectMeta{
1006 Name: "hostport-mapping-2",
1007 Namespace: "ns",
1008 Labels: map[string]string{
1009 "app": "hostport-mapping-2",
1010 },
1011 },
1012 Spec: corev1.PodSpec{
1013 Containers: []corev1.Container{
1014 {
1015 Name: pkgk8s.ProxyContainerName,
1016 Env: []corev1.EnvVar{
1017 {
1018 Name: "LINKERD2_PROXY_INBOUND_LISTEN_ADDR",
1019 Value: "0.0.0.0:4143",
1020 },
1021 },
1022 },
1023 {
1024 Name: "nginx",
1025 Image: "nginx",
1026 Ports: []corev1.ContainerPort{
1027 {
1028 Name: "nginx-7777",
1029 ContainerPort: (int32)(containerPort),
1030 HostPort: (int32)(hostPort),
1031 },
1032 },
1033 },
1034 },
1035 },
1036 Status: corev1.PodStatus{
1037 Phase: "Running",
1038 Conditions: []corev1.PodCondition{
1039 {
1040 Type: corev1.PodReady,
1041 Status: corev1.ConditionTrue,
1042 },
1043 },
1044 HostIP: externalIP,
1045 HostIPs: []corev1.HostIP{{IP: externalIP}, {IP: externalIPv6}},
1046 PodIP: "172.17.0.55",
1047 PodIPs: []corev1.PodIP{{IP: "172.17.0.55"}},
1048 },
1049 }, metav1.CreateOptions{})
1050 if err != nil {
1051 t.Fatalf("Failed to create pod: %s", err)
1052 }
1053
1054 err = testutil.RetryFor(time.Second*10, func() error {
1055 updates := stream.Updates()
1056 if len(updates) < 3 {
1057 return fmt.Errorf("expected 3 updates, got %d", len(updates))
1058 }
1059 return nil
1060 })
1061 if err != nil {
1062 t.Fatal(err)
1063 }
1064
1065 profile = stream.Updates()[2]
1066 dstPod = profile.Endpoint.MetricLabels["pod"]
1067 if dstPod != "hostport-mapping-2" {
1068 t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping-2", dstPod)
1069 }
1070 if profile.OpaqueProtocol {
1071 t.Fatal("Expected OpaqueProtocol=false")
1072 }
1073
1074
1075 l5dClient.ServerV1beta2().Servers("ns").Create(context.Background(), &v1beta2.Server{
1076 ObjectMeta: metav1.ObjectMeta{
1077 Name: "srv-hostport-mapping-2",
1078 Namespace: "ns",
1079 },
1080 Spec: v1beta2.ServerSpec{
1081 PodSelector: &metav1.LabelSelector{
1082 MatchLabels: map[string]string{
1083 "app": "hostport-mapping-2",
1084 },
1085 },
1086 Port: intstr.IntOrString{
1087 Type: intstr.String,
1088 StrVal: "nginx-7777",
1089 },
1090 ProxyProtocol: "opaque",
1091 },
1092 }, metav1.CreateOptions{})
1093
1094 var updates []*pb.DestinationProfile
1095 err = testutil.RetryFor(time.Second*10, func() error {
1096 updates = stream.Updates()
1097 if len(updates) < 4 {
1098 return fmt.Errorf("expected 4 updates, got %d", len(updates))
1099 }
1100 return nil
1101 })
1102 if err != nil {
1103 t.Fatal(err)
1104 }
1105
1106 profile = stream.Updates()[3]
1107 if !profile.OpaqueProtocol {
1108 t.Fatal("Expected OpaqueProtocol=true")
1109 }
1110 })
1111 }
1112
1113 func TestTokenStructure(t *testing.T) {
1114 t.Run("when JSON is valid", func(t *testing.T) {
1115 server := makeServer(t)
1116 defer server.clusterStore.UnregisterGauges()
1117
1118 dest := &pb.GetDestination{ContextToken: "{\"ns\":\"ns-1\",\"nodeName\":\"node-1\"}\n"}
1119 token := server.parseContextToken(dest.ContextToken)
1120
1121 if token.Ns != "ns-1" {
1122 t.Fatalf("Expected token namespace to be %s got %s", "ns-1", token.Ns)
1123 }
1124
1125 if token.NodeName != "node-1" {
1126 t.Fatalf("Expected token nodeName to be %s got %s", "node-1", token.NodeName)
1127 }
1128 })
1129
1130 t.Run("when JSON is invalid and old token format used", func(t *testing.T) {
1131 server := makeServer(t)
1132 defer server.clusterStore.UnregisterGauges()
1133
1134 dest := &pb.GetDestination{ContextToken: "ns:ns-2"}
1135 token := server.parseContextToken(dest.ContextToken)
1136 if token.Ns != "ns-2" {
1137 t.Fatalf("Expected %s got %s", "ns-2", token.Ns)
1138 }
1139 })
1140
1141 t.Run("when invalid JSON and invalid old format", func(t *testing.T) {
1142 server := makeServer(t)
1143 server.clusterStore.UnregisterGauges()
1144
1145 dest := &pb.GetDestination{ContextToken: "123fa-test"}
1146 token := server.parseContextToken(dest.ContextToken)
1147 if token.Ns != "" || token.NodeName != "" {
1148 t.Fatalf("Expected context token to be empty, got %v", token)
1149 }
1150 })
1151 }
1152
1153 func updateAddAddress(t *testing.T, update *pb.Update) []string {
1154 t.Helper()
1155 add, ok := update.GetUpdate().(*pb.Update_Add)
1156 if !ok {
1157 t.Fatalf("Update expected to be an add, but was %+v", update)
1158 }
1159 ips := []string{}
1160 for _, ip := range add.Add.Addrs {
1161 ips = append(ips, addr.ProxyAddressToString(ip.GetAddr()))
1162 }
1163 return ips
1164 }
1165
1166 func updateRemoveAddress(t *testing.T, update *pb.Update) []string {
1167 t.Helper()
1168 add, ok := update.GetUpdate().(*pb.Update_Remove)
1169 if !ok {
1170 t.Fatalf("Update expected to be a remove, but was %+v", update)
1171 }
1172 ips := []string{}
1173 for _, ip := range add.Remove.Addrs {
1174 ips = append(ips, addr.ProxyAddressToString(ip))
1175 }
1176 return ips
1177 }
1178
1179 func toAddress(path string, port uint32) (*net.TcpAddress, error) {
1180 ip, err := addr.ParseProxyIP(path)
1181 if err != nil {
1182 return nil, err
1183 }
1184 return &net.TcpAddress{
1185 Ip: ip,
1186 Port: port,
1187 }, nil
1188 }
1189
1190 func TestIpWatcherGetSvcID(t *testing.T) {
1191 name := "service"
1192 namespace := "test"
1193 clusterIP := "10.245.0.1"
1194 k8sConfigs := `
1195 apiVersion: v1
1196 kind: Service
1197 metadata:
1198 name: service
1199 namespace: test
1200 spec:
1201 type: ClusterIP
1202 clusterIP: 10.245.0.1
1203 clusterIPs:
1204 - 10.245.0.1
1205 - 2001:db8::88
1206 ports:
1207 - port: 1234`
1208
1209 t.Run("get services IDs by IP address", func(t *testing.T) {
1210 k8sAPI, err := k8s.NewFakeAPI(k8sConfigs)
1211 if err != nil {
1212 t.Fatalf("NewFakeAPI returned an error: %s", err)
1213 }
1214
1215 err = watcher.InitializeIndexers(k8sAPI)
1216 if err != nil {
1217 t.Fatalf("InitializeIndexers returned an error: %s", err)
1218 }
1219
1220 k8sAPI.Sync(nil)
1221
1222 svc, err := getSvcID(k8sAPI, clusterIP, logging.WithFields(nil))
1223 if err != nil {
1224 t.Fatalf("Error getting service: %s", err)
1225 }
1226 if svc == nil {
1227 t.Fatalf("Expected to find service mapped to [%s]", clusterIP)
1228 }
1229 if svc.Name != name {
1230 t.Fatalf("Expected service name to be [%s], but got [%s]", name, svc.Name)
1231 }
1232 if svc.Namespace != namespace {
1233 t.Fatalf("Expected service namespace to be [%s], but got [%s]", namespace, svc.Namespace)
1234 }
1235
1236 svc6, err := getSvcID(k8sAPI, clusterIPv6, logging.WithFields(nil))
1237 if err != nil {
1238 t.Fatalf("Error getting service: %s", err)
1239 }
1240 if svc6 == nil {
1241 t.Fatalf("Expected to find service mapped to [%s]", clusterIPv6)
1242 }
1243 if svc.Name != name {
1244 t.Fatalf("Expected service name to be [%s], but got [%s]", name, svc.Name)
1245 }
1246 if svc.Namespace != namespace {
1247 t.Fatalf("Expected service namespace to be [%s], but got [%s]", namespace, svc.Namespace)
1248 }
1249
1250 badClusterIP := "10.256.0.2"
1251 svc, err = getSvcID(k8sAPI, badClusterIP, logging.WithFields(nil))
1252 if err != nil {
1253 t.Fatalf("Error getting service: %s", err)
1254 }
1255 if svc != nil {
1256 t.Fatalf("Expected not to find service mapped to [%s]", badClusterIP)
1257 }
1258 })
1259 }
1260
1261 func testReturnEndpoints(t *testing.T, fqdn, ip string, port uint32) {
1262 t.Helper()
1263
1264 server := makeServer(t)
1265 defer server.clusterStore.UnregisterGauges()
1266
1267 stream := &bufferingGetStream{
1268 updates: make(chan *pb.Update, 50),
1269 MockServerStream: util.NewMockServerStream(),
1270 }
1271 defer stream.Cancel()
1272
1273 testReturnEndpointsForServer(t, server, stream, fqdn, ip, port)
1274 }
1275
1276 func testReturnEndpointsForServer(t *testing.T, server *server, stream *bufferingGetStream, fqdn, ip string, port uint32) {
1277 t.Helper()
1278
1279 errs := make(chan error)
1280
1281
1282 go func() {
1283 err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", fqdn, port)}, stream)
1284 if err != nil {
1285 errs <- err
1286 }
1287 }()
1288
1289 addr := fmt.Sprintf("%s:%d", ip, port)
1290 parsedIP, err := netip.ParseAddr(ip)
1291 if err != nil {
1292 t.Fatalf("Invalid IP [%s]: %s", ip, err)
1293 }
1294 if parsedIP.Is6() {
1295 addr = fmt.Sprintf("[%s]:%d", ip, port)
1296 }
1297
1298 select {
1299 case update := <-stream.updates:
1300 if updateAddAddress(t, update)[0] != addr {
1301 t.Fatalf("Expected %s but got %s", addr, updateAddAddress(t, update)[0])
1302 }
1303
1304 if len(stream.updates) != 0 {
1305 t.Fatalf("Expected 1 update but got %d: %v", 1+len(stream.updates), stream.updates)
1306 }
1307 case err := <-errs:
1308 t.Fatalf("Got error: %s", err)
1309 }
1310 }
1311
1312 func assertSingleProfile(t *testing.T, updates []*pb.DestinationProfile) *pb.DestinationProfile {
1313 t.Helper()
1314
1315
1316
1317
1318 if len(updates) != 1 {
1319 t.Fatalf("Expected 1 profile update but got %d: %v", len(updates), updates)
1320 }
1321 return updates[0]
1322 }
1323
1324 func profileStream(t *testing.T, server *server, host string, port uint32, token string) *bufferingGetProfileStream {
1325 t.Helper()
1326
1327 stream := &bufferingGetProfileStream{
1328 updates: []*pb.DestinationProfile{},
1329 MockServerStream: util.NewMockServerStream(),
1330 }
1331
1332 go func() {
1333 err := server.GetProfile(&pb.GetDestination{
1334 Scheme: "k8s",
1335 Path: gonet.JoinHostPort(host, fmt.Sprintf("%d", port)),
1336 ContextToken: token,
1337 }, stream)
1338 if err != nil {
1339 logging.Fatalf("Got error: %s", err)
1340 }
1341 }()
1342
1343 time.Sleep(50 * time.Millisecond)
1344
1345 return stream
1346 }
1347
View as plain text