1use futures::prelude::*;
2use kube::ResourceExt;
3use linkerd_policy_controller_k8s_api as k8s;
4use linkerd_policy_test::{
5 assert_default_accrual_backoff, assert_svc_meta, create, create_annotated_service,
6 create_cluster_scoped, create_opaque_service, create_service, delete_cluster_scoped, grpc,
7 mk_service, with_temp_ns,
8};
9use maplit::{btreemap, convert_args};
10use std::{collections::BTreeMap, time::Duration};
11use tokio::time;
12
13// These tests are copies of the tests in outbound_api_gateway.rs but using the
14// policy.linkerd.io HttpRoute kubernetes types instead of the Gateway API ones.
15// These two files should be kept in sync to ensure that Linkerd can read and
16// function correctly with both types of resources.
17
18#[tokio::test(flavor = "current_thread")]
19async fn service_does_not_exist() {
20 with_temp_ns(|client, ns| async move {
21 // Build a service but don't apply it to the cluster.
22 let mut svc = mk_service(&ns, "my-svc", 4191);
23 // Give it a bogus cluster ip.
24 svc.spec.as_mut().unwrap().cluster_ip = Some("1.1.1.1".to_string());
25
26 let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(&client).await;
27 let rsp = policy_api.watch(&ns, &svc, 4191).await;
28
29 assert!(rsp.is_err());
30 assert_eq!(rsp.err().unwrap().code(), tonic::Code::NotFound);
31 })
32 .await;
33}
34
35#[tokio::test(flavor = "current_thread")]
36async fn service_with_no_http_routes() {
37 with_temp_ns(|client, ns| async move {
38 // Create a service
39 let svc = create_service(&client, &ns, "my-svc", 4191).await;
40
41 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
42 let config = rx
43 .next()
44 .await
45 .expect("watch must not fail")
46 .expect("watch must return an initial config");
47 tracing::trace!(?config);
48
49 assert_svc_meta(&config.metadata, &svc, 4191);
50
51 // There should be a default route.
52 detect_http_routes(&config, |routes| {
53 let route = assert_singleton(routes);
54 assert_route_is_default(route, &svc, 4191);
55 });
56 })
57 .await;
58}
59
60#[tokio::test(flavor = "current_thread")]
61async fn service_with_http_route_without_rules() {
62 with_temp_ns(|client, ns| async move {
63 // Create a service
64 let svc = create_service(&client, &ns, "my-svc", 4191).await;
65
66 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
67 let config = rx
68 .next()
69 .await
70 .expect("watch must not fail")
71 .expect("watch must return an initial config");
72 tracing::trace!(?config);
73
74 assert_svc_meta(&config.metadata, &svc, 4191);
75
76 // There should be a default route.
77 detect_http_routes(&config, |routes| {
78 let route = assert_singleton(routes);
79 assert_route_is_default(route, &svc, 4191);
80 });
81
82 let _route = create(&client, mk_empty_http_route(&ns, "foo-route", &svc, 4191)).await;
83
84 let config = rx
85 .next()
86 .await
87 .expect("watch must not fail")
88 .expect("watch must return an updated config");
89 tracing::trace!(?config);
90
91 assert_svc_meta(&config.metadata, &svc, 4191);
92
93 // There should be a route with no rules.
94 detect_http_routes(&config, |routes| {
95 let route = assert_singleton(routes);
96 assert_eq!(route.rules.len(), 0);
97 });
98 })
99 .await;
100}
101
102#[tokio::test(flavor = "current_thread")]
103async fn service_with_http_routes_without_backends() {
104 with_temp_ns(|client, ns| async move {
105 // Create a service
106 let svc = create_service(&client, &ns, "my-svc", 4191).await;
107
108 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
109 let config = rx
110 .next()
111 .await
112 .expect("watch must not fail")
113 .expect("watch must return an initial config");
114 tracing::trace!(?config);
115
116 assert_svc_meta(&config.metadata, &svc, 4191);
117
118 // There should be a default route.
119 detect_http_routes(&config, |routes| {
120 let route = assert_singleton(routes);
121 assert_route_is_default(route, &svc, 4191);
122 });
123
124 let _route = create(
125 &client,
126 mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
127 )
128 .await;
129
130 let config = rx
131 .next()
132 .await
133 .expect("watch must not fail")
134 .expect("watch must return an updated config");
135 tracing::trace!(?config);
136
137 assert_svc_meta(&config.metadata, &svc, 4191);
138
139 // There should be a route with the logical backend.
140 detect_http_routes(&config, |routes| {
141 let route = assert_singleton(routes);
142 let backends = route_backends_first_available(route);
143 let backend = assert_singleton(backends);
144 assert_backend_matches_service(backend, &svc, 4191);
145 });
146 })
147 .await;
148}
149
150#[tokio::test(flavor = "current_thread")]
151async fn service_with_http_routes_with_backend() {
152 with_temp_ns(|client, ns| async move {
153 // Create a service
154 let svc = create_service(&client, &ns, "my-svc", 4191).await;
155
156 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
157 let config = rx
158 .next()
159 .await
160 .expect("watch must not fail")
161 .expect("watch must return an initial config");
162 tracing::trace!(?config);
163
164 assert_svc_meta(&config.metadata, &svc, 4191);
165
166 // There should be a default route.
167 detect_http_routes(&config, |routes| {
168 let route = assert_singleton(routes);
169 assert_route_is_default(route, &svc, 4191);
170 });
171
172 let backend_name = "backend";
173 let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
174 let backends = [backend_name];
175 let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
176 Some(&backends),
177 None,
178 None,
179 );
180 let _route = create(&client, route.build()).await;
181
182 let config = rx
183 .next()
184 .await
185 .expect("watch must not fail")
186 .expect("watch must return an updated config");
187 tracing::trace!(?config);
188
189 assert_svc_meta(&config.metadata, &svc, 4191);
190
191 // There should be a route with a backend with no filters.
192 detect_http_routes(&config, |routes| {
193 let route = assert_singleton(routes);
194 let backends = route_backends_random_available(route);
195 let backend = assert_singleton(backends);
196 assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
197 let filters = &backend.backend.as_ref().unwrap().filters;
198 assert_eq!(filters.len(), 0);
199 });
200 })
201 .await;
202}
203
204#[tokio::test(flavor = "current_thread")]
205async fn service_with_http_routes_with_cross_namespace_backend() {
206 with_temp_ns(|client, ns| async move {
207 // Create a service
208 let svc = create_service(&client, &ns, "my-svc", 4191).await;
209
210 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
211 let config = rx
212 .next()
213 .await
214 .expect("watch must not fail")
215 .expect("watch must return an initial config");
216 tracing::trace!(?config);
217
218 assert_svc_meta(&config.metadata, &svc, 4191);
219
220 // There should be a default route.
221 detect_http_routes(&config, |routes| {
222 let route = assert_singleton(routes);
223 assert_route_is_default(route, &svc, 4191);
224 });
225
226 let backend_ns_name = format!("{}-backend", ns);
227 let backend_ns = create_cluster_scoped(
228 &client,
229 k8s::Namespace {
230 metadata: k8s::ObjectMeta {
231 name: Some(backend_ns_name.clone()),
232 labels: Some(convert_args!(btreemap!(
233 "linkerd-policy-test" => std::thread::current().name().unwrap_or(""),
234 ))),
235 ..Default::default()
236 },
237 ..Default::default()
238 },
239 )
240 .await;
241 let backend_name = "backend";
242 let backend_svc = create_service(&client, &backend_ns_name, backend_name, 8888).await;
243 let backends = [backend_name];
244 let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
245 Some(&backends),
246 Some(backend_ns_name),
247 None,
248 );
249 let _route = create(&client, route.build()).await;
250
251 let config = rx
252 .next()
253 .await
254 .expect("watch must not fail")
255 .expect("watch must return an updated config");
256 tracing::trace!(?config);
257
258 assert_svc_meta(&config.metadata, &svc, 4191);
259
260 // There should be a route with a backend with no filters.
261 detect_http_routes(&config, |routes| {
262 let route = assert_singleton(routes);
263 let backends = route_backends_random_available(route);
264 let backend = assert_singleton(backends);
265 assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
266 let filters = &backend.backend.as_ref().unwrap().filters;
267 assert_eq!(filters.len(), 0);
268 });
269
270 delete_cluster_scoped(&client, backend_ns).await
271 })
272 .await;
273}
274
275// TODO: Test fails until handling of invalid backends is implemented.
276#[tokio::test(flavor = "current_thread")]
277async fn service_with_http_routes_with_invalid_backend() {
278 with_temp_ns(|client, ns| async move {
279 // Create a service
280 let svc = create_service(&client, &ns, "my-svc", 4191).await;
281
282 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
283 let config = rx
284 .next()
285 .await
286 .expect("watch must not fail")
287 .expect("watch must return an initial config");
288 tracing::trace!(?config);
289
290 assert_svc_meta(&config.metadata, &svc, 4191);
291
292 // There should be a default route.
293 detect_http_routes(&config, |routes| {
294 let route = assert_singleton(routes);
295 assert_route_is_default(route, &svc, 4191);
296 });
297
298 let backends = ["invalid-backend"];
299 let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
300 Some(&backends),
301 None,
302 None,
303 );
304 let _route = create(&client, route.build()).await;
305
306 let config = rx
307 .next()
308 .await
309 .expect("watch must not fail")
310 .expect("watch must return an updated config");
311 tracing::trace!(?config);
312
313 assert_svc_meta(&config.metadata, &svc, 4191);
314
315 // There should be a route with a backend.
316 detect_http_routes(&config, |routes| {
317 let route = assert_singleton(routes);
318 let backends = route_backends_random_available(route);
319 let backend = assert_singleton(backends);
320 assert_backend_has_failure_filter(backend);
321 });
322 })
323 .await;
324}
325
326// TODO: Investigate why the policy controller is only returning one route in this
327// case instead of two.
328#[tokio::test(flavor = "current_thread")]
329async fn service_with_multiple_http_routes() {
330 with_temp_ns(|client, ns| async move {
331 // Create a service
332 let svc = create_service(&client, &ns, "my-svc", 4191).await;
333
334 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
335 let config = rx
336 .next()
337 .await
338 .expect("watch must not fail")
339 .expect("watch must return an initial config");
340 tracing::trace!(?config);
341
342 assert_svc_meta(&config.metadata, &svc, 4191);
343
344 // There should be a default route.
345 detect_http_routes(&config, |routes| {
346 let route = assert_singleton(routes);
347 assert_route_is_default(route, &svc, 4191);
348 });
349
350 // Routes should be returned in sorted order by creation timestamp then
351 // name. To ensure that this test isn't timing dependant, routes should
352 // be created in alphabetical order.
353 let _a_route = create(
354 &client,
355 mk_http_route(&ns, "a-route", &svc, Some(4191)).build(),
356 )
357 .await;
358
359 // First route update.
360 let config = rx
361 .next()
362 .await
363 .expect("watch must not fail")
364 .expect("watch must return an updated config");
365 tracing::trace!(?config);
366
367 assert_svc_meta(&config.metadata, &svc, 4191);
368
369 let _b_route = create(
370 &client,
371 mk_http_route(&ns, "b-route", &svc, Some(4191)).build(),
372 )
373 .await;
374
375 // Second route update.
376 let config = rx
377 .next()
378 .await
379 .expect("watch must not fail")
380 .expect("watch must return an updated config");
381 tracing::trace!(?config);
382
383 assert_svc_meta(&config.metadata, &svc, 4191);
384
385 // There should be 2 routes, returned in order.
386 detect_http_routes(&config, |routes| {
387 assert_eq!(routes.len(), 2);
388 assert_eq!(route_name(&routes[0]), "a-route");
389 assert_eq!(route_name(&routes[1]), "b-route");
390 });
391 })
392 .await;
393}
394
395#[tokio::test(flavor = "current_thread")]
396async fn service_with_consecutive_failure_accrual() {
397 with_temp_ns(|client, ns| async move {
398 let svc = create_annotated_service(
399 &client,
400 &ns,
401 "consecutive-accrual-svc",
402 80,
403 BTreeMap::from([
404 (
405 "balancer.linkerd.io/failure-accrual".to_string(),
406 "consecutive".to_string(),
407 ),
408 (
409 "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
410 "8".to_string(),
411 ),
412 (
413 "balancer.linkerd.io/failure-accrual-consecutive-min-penalty".to_string(),
414 "10s".to_string(),
415 ),
416 (
417 "balancer.linkerd.io/failure-accrual-consecutive-max-penalty".to_string(),
418 "10m".to_string(),
419 ),
420 (
421 "balancer.linkerd.io/failure-accrual-consecutive-jitter-ratio".to_string(),
422 "1.0".to_string(),
423 ),
424 ]),
425 )
426 .await;
427
428 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
429 let config = rx
430 .next()
431 .await
432 .expect("watch must not fail")
433 .expect("watch must return an initial config");
434 tracing::trace!(?config);
435
436 detect_failure_accrual(&config, |accrual| {
437 let consecutive = failure_accrual_consecutive(accrual);
438 assert_eq!(8, consecutive.max_failures);
439 assert_eq!(
440 &grpc::outbound::ExponentialBackoff {
441 min_backoff: Some(Duration::from_secs(10).try_into().unwrap()),
442 max_backoff: Some(Duration::from_secs(600).try_into().unwrap()),
443 jitter_ratio: 1.0_f32,
444 },
445 consecutive
446 .backoff
447 .as_ref()
448 .expect("backoff must be configured")
449 );
450 });
451 })
452 .await;
453}
454
455#[tokio::test(flavor = "current_thread")]
456async fn service_with_consecutive_failure_accrual_defaults() {
457 with_temp_ns(|client, ns| async move {
458 // Create a service configured to do consecutive failure accrual, but
459 // with no additional configuration
460 let svc = create_annotated_service(
461 &client,
462 &ns,
463 "default-accrual-svc",
464 80,
465 BTreeMap::from([(
466 "balancer.linkerd.io/failure-accrual".to_string(),
467 "consecutive".to_string(),
468 )]),
469 )
470 .await;
471
472 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
473 let config = rx
474 .next()
475 .await
476 .expect("watch must not fail")
477 .expect("watch must return an initial config");
478 tracing::trace!(?config);
479
480 // Expect default max_failures and default backoff
481 detect_failure_accrual(&config, |accrual| {
482 let consecutive = failure_accrual_consecutive(accrual);
483 assert_eq!(7, consecutive.max_failures);
484 assert_default_accrual_backoff!(consecutive
485 .backoff
486 .as_ref()
487 .expect("backoff must be configured"));
488 });
489
490 // Create a service configured to do consecutive failure accrual with
491 // max number of failures and with default backoff
492 let svc = create_annotated_service(
493 &client,
494 &ns,
495 "no-backoff-svc",
496 80,
497 BTreeMap::from([
498 (
499 "balancer.linkerd.io/failure-accrual".to_string(),
500 "consecutive".to_string(),
501 ),
502 (
503 "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
504 "8".to_string(),
505 ),
506 ]),
507 )
508 .await;
509
510 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
511 let config = rx
512 .next()
513 .await
514 .expect("watch must not fail")
515 .expect("watch must return an initial config");
516 tracing::trace!(?config);
517
518 // Expect default backoff and overridden max_failures
519 detect_failure_accrual(&config, |accrual| {
520 let consecutive = failure_accrual_consecutive(accrual);
521 assert_eq!(8, consecutive.max_failures);
522 assert_default_accrual_backoff!(consecutive
523 .backoff
524 .as_ref()
525 .expect("backoff must be configured"));
526 });
527
528 // Create a service configured to do consecutive failure accrual with
529 // only the jitter ratio configured in the backoff
530 let svc = create_annotated_service(
531 &client,
532 &ns,
533 "only-jitter-svc",
534 80,
535 BTreeMap::from([
536 (
537 "balancer.linkerd.io/failure-accrual".to_string(),
538 "consecutive".to_string(),
539 ),
540 (
541 "balancer.linkerd.io/failure-accrual-consecutive-jitter-ratio".to_string(),
542 "1.0".to_string(),
543 ),
544 ]),
545 )
546 .await;
547
548 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
549 let config = rx
550 .next()
551 .await
552 .expect("watch must not fail")
553 .expect("watch must return an initial config");
554 tracing::trace!(?config);
555
556 // Expect defaults for everything except for the jitter ratio
557 detect_failure_accrual(&config, |accrual| {
558 let consecutive = failure_accrual_consecutive(accrual);
559 assert_eq!(7, consecutive.max_failures);
560 assert_eq!(
561 &grpc::outbound::ExponentialBackoff {
562 min_backoff: Some(Duration::from_secs(1).try_into().unwrap()),
563 max_backoff: Some(Duration::from_secs(60).try_into().unwrap()),
564 jitter_ratio: 1.0_f32,
565 },
566 consecutive
567 .backoff
568 .as_ref()
569 .expect("backoff must be configured")
570 );
571 });
572 })
573 .await;
574}
575
576#[tokio::test(flavor = "current_thread")]
577async fn service_with_default_failure_accrual() {
578 with_temp_ns(|client, ns| async move {
579 // Default config for Service, no failure accrual
580 let svc = create_service(&client, &ns, "default-failure-accrual", 80).await;
581
582 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
583 let config = rx
584 .next()
585 .await
586 .expect("watch must not fail")
587 .expect("watch must return an initial config");
588 tracing::trace!(?config);
589
590 // Expect failure accrual config to be default (no failure accrual)
591 detect_failure_accrual(&config, |accrual| {
592 assert!(
593 accrual.is_none(),
594 "consecutive failure accrual should not be configured for service"
595 );
596 });
597
598 // Create Service with consecutive failure accrual config for
599 // max_failures but no mode
600 let svc = create_annotated_service(
601 &client,
602 &ns,
603 "default-max-failure-svc",
604 80,
605 BTreeMap::from([(
606 "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
607 "8".to_string(),
608 )]),
609 )
610 .await;
611
612 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
613 let config = rx
614 .next()
615 .await
616 .expect("watch must not fail")
617 .expect("watch must return an initial config");
618 tracing::trace!(?config);
619
620 // Expect failure accrual config to be default (no failure accrual)
621 detect_failure_accrual(&config, |accrual| {
622 assert!(
623 accrual.is_none(),
624 "consecutive failure accrual should not be configured for service"
625 )
626 });
627 })
628 .await;
629}
630
631#[tokio::test(flavor = "current_thread")]
632async fn opaque_service() {
633 with_temp_ns(|client, ns| async move {
634 // Create a service
635 let svc = create_opaque_service(&client, &ns, "my-svc", 4191).await;
636
637 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
638 let config = rx
639 .next()
640 .await
641 .expect("watch must not fail")
642 .expect("watch must return an initial config");
643 tracing::trace!(?config);
644
645 // Proxy protocol should be opaque.
646 match config.protocol.unwrap().kind.unwrap() {
647 grpc::outbound::proxy_protocol::Kind::Opaque(_) => {}
648 _ => panic!("proxy protocol must be Opaque"),
649 };
650 })
651 .await;
652}
653
654#[tokio::test(flavor = "current_thread")]
655async fn route_with_filters() {
656 with_temp_ns(|client, ns| async move {
657 // Create a service
658 let svc = create_service(&client, &ns, "my-svc", 4191).await;
659
660 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
661 let config = rx
662 .next()
663 .await
664 .expect("watch must not fail")
665 .expect("watch must return an initial config");
666 tracing::trace!(?config);
667
668 // There should be a default route.
669 detect_http_routes(&config, |routes| {
670 let route = assert_singleton(routes);
671 assert_route_is_default(route, &svc, 4191);
672 });
673
674 let backend_name = "backend";
675 let backends = [backend_name];
676 let route = mk_http_route(&ns, "foo-route", &svc, Some(4191))
677 .with_backends(Some(&backends), None, None)
678 .with_filters(Some(vec![
679 k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier {
680 request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter {
681 set: Some(vec![k8s_gateway_api::HttpHeader {
682 name: "set".to_string(),
683 value: "set-value".to_string(),
684 }]),
685 add: Some(vec![k8s_gateway_api::HttpHeader {
686 name: "add".to_string(),
687 value: "add-value".to_string(),
688 }]),
689 remove: Some(vec!["remove".to_string()]),
690 },
691 },
692 k8s_gateway_api::HttpRouteFilter::RequestRedirect {
693 request_redirect: k8s_gateway_api::HttpRequestRedirectFilter {
694 scheme: Some("http".to_string()),
695 hostname: Some("host".to_string()),
696 path: Some(k8s_gateway_api::HttpPathModifier::ReplacePrefixMatch {
697 replace_prefix_match: "/path".to_string(),
698 }),
699 port: Some(5555),
700 status_code: Some(302),
701 },
702 },
703 ]));
704 let _route = create(&client, route.build()).await;
705
706 let config = rx
707 .next()
708 .await
709 .expect("watch must not fail")
710 .expect("watch must return an updated config");
711 tracing::trace!(?config);
712
713 // There should be a route with filters.
714 detect_http_routes(&config, |routes| {
715 let route = assert_singleton(routes);
716 let rule = assert_singleton(&route.rules);
717 let filters = &rule.filters;
718 assert_eq!(
719 *filters,
720 vec![
721 grpc::outbound::http_route::Filter {
722 kind: Some(
723 grpc::outbound::http_route::filter::Kind::RequestHeaderModifier(
724 grpc::http_route::RequestHeaderModifier {
725 add: Some(grpc::http_types::Headers {
726 headers: vec![grpc::http_types::headers::Header {
727 name: "add".to_string(),
728 value: "add-value".into(),
729 }]
730 }),
731 set: Some(grpc::http_types::Headers {
732 headers: vec![grpc::http_types::headers::Header {
733 name: "set".to_string(),
734 value: "set-value".into(),
735 }]
736 }),
737 remove: vec!["remove".to_string()],
738 }
739 )
740 )
741 },
742 grpc::outbound::http_route::Filter {
743 kind: Some(grpc::outbound::http_route::filter::Kind::Redirect(
744 grpc::http_route::RequestRedirect {
745 scheme: Some(grpc::http_types::Scheme {
746 r#type: Some(grpc::http_types::scheme::Type::Registered(
747 grpc::http_types::scheme::Registered::Http.into(),
748 ))
749 }),
750 host: "host".to_string(),
751 path: Some(linkerd2_proxy_api::http_route::PathModifier { replace: Some(linkerd2_proxy_api::http_route::path_modifier::Replace::Prefix("/path".to_string())) }),
752 port: 5555,
753 status: 302,
754 }
755 ))
756 }
757 ]
758 );
759 });
760 })
761 .await;
762}
763
764#[tokio::test(flavor = "current_thread")]
765async fn backend_with_filters() {
766 with_temp_ns(|client, ns| async move {
767 // Create a service
768 let svc = create_service(&client, &ns, "my-svc", 4191).await;
769
770 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
771 let config = rx
772 .next()
773 .await
774 .expect("watch must not fail")
775 .expect("watch must return an initial config");
776 tracing::trace!(?config);
777
778 // There should be a default route.
779 detect_http_routes(&config, |routes| {
780 let route = assert_singleton(routes);
781 assert_route_is_default(route, &svc, 4191);
782 });
783
784 let backend_name = "backend";
785 let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
786 let backends = [backend_name];
787 let route = mk_http_route(&ns, "foo-route", &svc, Some(4191))
788 .with_backends(Some(&backends), None, Some(vec![
789 k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier {
790 request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter {
791 set: Some(vec![k8s_gateway_api::HttpHeader {
792 name: "set".to_string(),
793 value: "set-value".to_string(),
794 }]),
795 add: Some(vec![k8s_gateway_api::HttpHeader {
796 name: "add".to_string(),
797 value: "add-value".to_string(),
798 }]),
799 remove: Some(vec!["remove".to_string()]),
800 },
801 },
802 k8s_gateway_api::HttpRouteFilter::RequestRedirect {
803 request_redirect: k8s_gateway_api::HttpRequestRedirectFilter {
804 scheme: Some("http".to_string()),
805 hostname: Some("host".to_string()),
806 path: Some(k8s_gateway_api::HttpPathModifier::ReplacePrefixMatch {
807 replace_prefix_match: "/path".to_string(),
808 }),
809 port: Some(5555),
810 status_code: Some(302),
811 },
812 },
813 ]));
814 let _route = create(&client, route.build()).await;
815
816 let config = rx
817 .next()
818 .await
819 .expect("watch must not fail")
820 .expect("watch must return an updated config");
821 tracing::trace!(?config);
822
823 // There should be a route without rule filters.
824 detect_http_routes(&config, |routes| {
825 let route = assert_singleton(routes);
826 let rule = assert_singleton(&route.rules);
827 assert_eq!(rule.filters.len(), 0);
828 let backends = route_backends_random_available(route);
829 let backend = assert_singleton(backends);
830 assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
831 let filters = &backend.backend.as_ref().unwrap().filters;
832 assert_eq!(
833 *filters,
834 vec![
835 grpc::outbound::http_route::Filter {
836 kind: Some(
837 grpc::outbound::http_route::filter::Kind::RequestHeaderModifier(
838 grpc::http_route::RequestHeaderModifier {
839 add: Some(grpc::http_types::Headers {
840 headers: vec![grpc::http_types::headers::Header {
841 name: "add".to_string(),
842 value: "add-value".into(),
843 }]
844 }),
845 set: Some(grpc::http_types::Headers {
846 headers: vec![grpc::http_types::headers::Header {
847 name: "set".to_string(),
848 value: "set-value".into(),
849 }]
850 }),
851 remove: vec!["remove".to_string()],
852 }
853 )
854 )
855 },
856 grpc::outbound::http_route::Filter {
857 kind: Some(grpc::outbound::http_route::filter::Kind::Redirect(
858 grpc::http_route::RequestRedirect {
859 scheme: Some(grpc::http_types::Scheme {
860 r#type: Some(grpc::http_types::scheme::Type::Registered(
861 grpc::http_types::scheme::Registered::Http.into(),
862 ))
863 }),
864 host: "host".to_string(),
865 path: Some(linkerd2_proxy_api::http_route::PathModifier { replace: Some(linkerd2_proxy_api::http_route::path_modifier::Replace::Prefix("/path".to_string())) }),
866 port: 5555,
867 status: 302,
868 }
869 ))
870 }
871 ]
872 );
873 });
874 })
875 .await;
876}
877
878#[tokio::test(flavor = "current_thread")]
879async fn http_route_with_no_port() {
880 with_temp_ns(|client, ns| async move {
881 // Create a service
882 let svc = create_service(&client, &ns, "my-svc", 4191).await;
883
884 let mut rx_4191 = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
885 let config_4191 = rx_4191
886 .next()
887 .await
888 .expect("watch must not fail")
889 .expect("watch must return an initial config");
890 tracing::trace!(?config_4191);
891
892 let mut rx_9999 = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
893 let config_9999 = rx_9999
894 .next()
895 .await
896 .expect("watch must not fail")
897 .expect("watch must return an initial config");
898 tracing::trace!(?config_9999);
899
900 // There should be a default route.
901 detect_http_routes(&config_4191, |routes| {
902 let route = assert_singleton(routes);
903 assert_route_is_default(route, &svc, 4191);
904 });
905 detect_http_routes(&config_9999, |routes| {
906 let route = assert_singleton(routes);
907 assert_route_is_default(route, &svc, 9999);
908 });
909
910 let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await;
911
912 let config_4191 = rx_4191
913 .next()
914 .await
915 .expect("watch must not fail")
916 .expect("watch must return an updated config");
917 tracing::trace!(?config_4191);
918
919 // The route should apply to the service.
920 detect_http_routes(&config_4191, |routes| {
921 let route = assert_singleton(routes);
922 assert_route_name_eq(route, "foo-route");
923 });
924
925 let config_9999 = rx_9999
926 .next()
927 .await
928 .expect("watch must not fail")
929 .expect("watch must return an updated config");
930 tracing::trace!(?config_9999);
931
932 // The route should apply to other ports too.
933 detect_http_routes(&config_9999, |routes| {
934 let route = assert_singleton(routes);
935 assert_route_name_eq(route, "foo-route");
936 });
937 })
938 .await;
939}
940
941#[tokio::test(flavor = "current_thread")]
942async fn producer_route() {
943 with_temp_ns(|client, ns| async move {
944 // Create a service
945 let svc = create_service(&client, &ns, "my-svc", 4191).await;
946
947 let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
948 let producer_config = producer_rx
949 .next()
950 .await
951 .expect("watch must not fail")
952 .expect("watch must return an initial config");
953 tracing::trace!(?producer_config);
954
955 let mut consumer_rx = retry_watch_outbound_policy(&client, "consumer_ns", &svc, 4191).await;
956 let consumer_config = consumer_rx
957 .next()
958 .await
959 .expect("watch must not fail")
960 .expect("watch must return an initial config");
961 tracing::trace!(?consumer_config);
962
963 // There should be a default route.
964 detect_http_routes(&producer_config, |routes| {
965 let route = assert_singleton(routes);
966 assert_route_is_default(route, &svc, 4191);
967 });
968 detect_http_routes(&consumer_config, |routes| {
969 let route = assert_singleton(routes);
970 assert_route_is_default(route, &svc, 4191);
971 });
972
973 // A route created in the same namespace as its parent service is called
974 // a producer route. It should be returned in outbound policy requests
975 // for that service from ALL namespaces.
976 let _route = create(
977 &client,
978 mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
979 )
980 .await;
981
982 let producer_config = producer_rx
983 .next()
984 .await
985 .expect("watch must not fail")
986 .expect("watch must return an updated config");
987 tracing::trace!(?producer_config);
988 let consumer_config = consumer_rx
989 .next()
990 .await
991 .expect("watch must not fail")
992 .expect("watch must return an initial config");
993 tracing::trace!(?consumer_config);
994
995 // The route should be returned in queries from the producer namespace.
996 detect_http_routes(&producer_config, |routes| {
997 let route = assert_singleton(routes);
998 assert_route_name_eq(route, "foo-route");
999 });
1000
1001 // The route should be returned in queries from a consumer namespace.
1002 detect_http_routes(&consumer_config, |routes| {
1003 let route = assert_singleton(routes);
1004 assert_route_name_eq(route, "foo-route");
1005 });
1006 })
1007 .await;
1008}
1009
1010#[tokio::test(flavor = "current_thread")]
1011async fn pre_existing_producer_route() {
1012 // We test the scenario where outbound policy watches are initiated after
1013 // a produce route already exists.
1014 with_temp_ns(|client, ns| async move {
1015 // Create a service
1016 let svc = create_service(&client, &ns, "my-svc", 4191).await;
1017
1018 // A route created in the same namespace as its parent service is called
1019 // a producer route. It should be returned in outbound policy requests
1020 // for that service from ALL namespaces.
1021 let _route = create(
1022 &client,
1023 mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
1024 )
1025 .await;
1026
1027 let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
1028 let producer_config = producer_rx
1029 .next()
1030 .await
1031 .expect("watch must not fail")
1032 .expect("watch must return an initial config");
1033 tracing::trace!(?producer_config);
1034
1035 let mut consumer_rx = retry_watch_outbound_policy(&client, "consumer_ns", &svc, 4191).await;
1036 let consumer_config = consumer_rx
1037 .next()
1038 .await
1039 .expect("watch must not fail")
1040 .expect("watch must return an initial config");
1041 tracing::trace!(?consumer_config);
1042
1043 // The route should be returned in queries from the producer namespace.
1044 detect_http_routes(&producer_config, |routes| {
1045 let route = assert_singleton(routes);
1046 assert_route_name_eq(route, "foo-route");
1047 });
1048
1049 // The route should be returned in queries from a consumer namespace.
1050 detect_http_routes(&consumer_config, |routes| {
1051 let route = assert_singleton(routes);
1052 assert_route_name_eq(route, "foo-route");
1053 });
1054 })
1055 .await;
1056}
1057
1058#[tokio::test(flavor = "current_thread")]
1059async fn consumer_route() {
1060 with_temp_ns(|client, ns| async move {
1061 // Create a service
1062 let svc = create_service(&client, &ns, "my-svc", 4191).await;
1063
1064 let consumer_ns_name = format!("{}-consumer", ns);
1065 let consumer_ns = create_cluster_scoped(
1066 &client,
1067 k8s::Namespace {
1068 metadata: k8s::ObjectMeta {
1069 name: Some(consumer_ns_name.clone()),
1070 labels: Some(convert_args!(btreemap!(
1071 "linkerd-policy-test" => std::thread::current().name().unwrap_or(""),
1072 ))),
1073 ..Default::default()
1074 },
1075 ..Default::default()
1076 },
1077 )
1078 .await;
1079
1080 let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
1081 let producer_config = producer_rx
1082 .next()
1083 .await
1084 .expect("watch must not fail")
1085 .expect("watch must return an initial config");
1086 tracing::trace!(?producer_config);
1087
1088 let mut consumer_rx =
1089 retry_watch_outbound_policy(&client, &consumer_ns_name, &svc, 4191).await;
1090 let consumer_config = consumer_rx
1091 .next()
1092 .await
1093 .expect("watch must not fail")
1094 .expect("watch must return an initial config");
1095 tracing::trace!(?consumer_config);
1096
1097 let mut other_rx = retry_watch_outbound_policy(&client, "other_ns", &svc, 4191).await;
1098 let other_config = other_rx
1099 .next()
1100 .await
1101 .expect("watch must not fail")
1102 .expect("watch must return an initial config");
1103 tracing::trace!(?other_config);
1104
1105 // There should be a default route.
1106 detect_http_routes(&producer_config, |routes| {
1107 let route = assert_singleton(routes);
1108 assert_route_is_default(route, &svc, 4191);
1109 });
1110 detect_http_routes(&consumer_config, |routes| {
1111 let route = assert_singleton(routes);
1112 assert_route_is_default(route, &svc, 4191);
1113 });
1114 detect_http_routes(&other_config, |routes| {
1115 let route = assert_singleton(routes);
1116 assert_route_is_default(route, &svc, 4191);
1117 });
1118
1119 // A route created in a different namespace as its parent service is
1120 // called a consumer route. It should be returned in outbound policy
1121 // requests for that service ONLY when the request comes from the
1122 // consumer namespace.
1123 let _route = create(
1124 &client,
1125 mk_http_route(&consumer_ns_name, "foo-route", &svc, Some(4191)).build(),
1126 )
1127 .await;
1128
1129 // The route should NOT be returned in queries from the producer namespace.
1130 // There should be a default route.
1131 assert!(producer_rx.next().now_or_never().is_none());
1132
1133 // The route should be returned in queries from the same consumer
1134 // namespace.
1135 let consumer_config = consumer_rx
1136 .next()
1137 .await
1138 .expect("watch must not fail")
1139 .expect("watch must return an initial config");
1140 tracing::trace!(?consumer_config);
1141
1142 detect_http_routes(&consumer_config, |routes| {
1143 let route = assert_singleton(routes);
1144 assert_route_name_eq(route, "foo-route");
1145 });
1146
1147 // The route should NOT be returned in queries from a different consumer
1148 // namespace.
1149 assert!(other_rx.next().now_or_never().is_none());
1150
1151 delete_cluster_scoped(&client, consumer_ns).await;
1152 })
1153 .await;
1154}
1155
1156/* Helpers */
1157
1158struct HttpRouteBuilder(k8s_gateway_api::HttpRoute);
1159
1160async fn retry_watch_outbound_policy(
1161 client: &kube::Client,
1162 ns: &str,
1163 svc: &k8s::Service,
1164 port: u16,
1165) -> tonic::Streaming<grpc::outbound::OutboundPolicy> {
1166 // Port-forward to the control plane and start watching the service's
1167 // outbound policy.
1168 let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await;
1169 loop {
1170 match policy_api.watch(ns, svc, port).await {
1171 Ok(rx) => return rx,
1172 Err(error) => {
1173 tracing::error!(
1174 ?error,
1175 ns,
1176 svc = svc.name_unchecked(),
1177 "failed to watch outbound policy for port 4191"
1178 );
1179 time::sleep(Duration::from_secs(1)).await;
1180 }
1181 }
1182 }
1183}
1184
1185fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option<u16>) -> HttpRouteBuilder {
1186 use k8s_gateway_api as api;
1187
1188 HttpRouteBuilder(api::HttpRoute {
1189 metadata: kube::api::ObjectMeta {
1190 namespace: Some(ns.to_string()),
1191 name: Some(name.to_string()),
1192 ..Default::default()
1193 },
1194 spec: api::HttpRouteSpec {
1195 inner: api::CommonRouteSpec {
1196 parent_refs: Some(vec![api::ParentReference {
1197 group: Some("core".to_string()),
1198 kind: Some("Service".to_string()),
1199 namespace: svc.namespace(),
1200 name: svc.name_unchecked(),
1201 section_name: None,
1202 port,
1203 }]),
1204 },
1205 hostnames: None,
1206 rules: Some(vec![api::HttpRouteRule {
1207 matches: Some(vec![api::HttpRouteMatch {
1208 path: Some(api::HttpPathMatch::Exact {
1209 value: "/foo".to_string(),
1210 }),
1211 headers: None,
1212 query_params: None,
1213 method: Some("GET".to_string()),
1214 }]),
1215 filters: None,
1216 backend_refs: None,
1217 }]),
1218 },
1219 status: None,
1220 })
1221}
1222
1223impl HttpRouteBuilder {
1224 fn with_backends(
1225 self,
1226 backends: Option<&[&str]>,
1227 backends_ns: Option<String>,
1228 backend_filters: Option<Vec<k8s_gateway_api::HttpRouteFilter>>,
1229 ) -> Self {
1230 let mut route = self.0;
1231 let backend_refs = backends.map(|names| {
1232 names
1233 .iter()
1234 .map(|name| k8s_gateway_api::HttpBackendRef {
1235 backend_ref: Some(k8s_gateway_api::BackendRef {
1236 weight: None,
1237 inner: k8s_gateway_api::BackendObjectReference {
1238 name: name.to_string(),
1239 port: Some(8888),
1240 group: None,
1241 kind: None,
1242 namespace: backends_ns.clone(),
1243 },
1244 }),
1245 filters: backend_filters.clone(),
1246 })
1247 .collect()
1248 });
1249 route.spec.rules.iter_mut().flatten().for_each(|rule| {
1250 rule.backend_refs = backend_refs.clone();
1251 });
1252 Self(route)
1253 }
1254
1255 fn with_filters(self, filters: Option<Vec<k8s_gateway_api::HttpRouteFilter>>) -> Self {
1256 let mut route = self.0;
1257 route
1258 .spec
1259 .rules
1260 .iter_mut()
1261 .flatten()
1262 .for_each(|rule| rule.filters = filters.clone());
1263 Self(route)
1264 }
1265
1266 fn build(self) -> k8s_gateway_api::HttpRoute {
1267 self.0
1268 }
1269}
1270
1271fn mk_empty_http_route(
1272 ns: &str,
1273 name: &str,
1274 svc: &k8s::Service,
1275 port: u16,
1276) -> k8s_gateway_api::HttpRoute {
1277 use k8s_gateway_api as api;
1278 api::HttpRoute {
1279 metadata: kube::api::ObjectMeta {
1280 namespace: Some(ns.to_string()),
1281 name: Some(name.to_string()),
1282 ..Default::default()
1283 },
1284 spec: api::HttpRouteSpec {
1285 inner: api::CommonRouteSpec {
1286 parent_refs: Some(vec![api::ParentReference {
1287 group: Some("core".to_string()),
1288 kind: Some("Service".to_string()),
1289 namespace: svc.namespace(),
1290 name: svc.name_unchecked(),
1291 section_name: None,
1292 port: Some(port),
1293 }]),
1294 },
1295 hostnames: None,
1296 rules: Some(vec![]),
1297 },
1298 status: None,
1299 }
1300}
1301
1302// detect_http_routes asserts that the given outbound policy has a proxy protcol
1303// of "Detect" and then invokes the given function with the Http1 and Http2
1304// routes from the Detect.
1305#[track_caller]
1306fn detect_http_routes<F>(config: &grpc::outbound::OutboundPolicy, f: F)
1307where
1308 F: Fn(&[grpc::outbound::HttpRoute]),
1309{
1310 let kind = config
1311 .protocol
1312 .as_ref()
1313 .expect("must have proxy protocol")
1314 .kind
1315 .as_ref()
1316 .expect("must have kind");
1317 if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
1318 opaque: _,
1319 timeout: _,
1320 http1,
1321 http2,
1322 }) = kind
1323 {
1324 let http1 = http1
1325 .as_ref()
1326 .expect("proxy protocol must have http1 field");
1327 let http2 = http2
1328 .as_ref()
1329 .expect("proxy protocol must have http2 field");
1330 f(&http1.routes);
1331 f(&http2.routes);
1332 } else {
1333 panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
1334 }
1335}
1336
1337#[track_caller]
1338fn detect_failure_accrual<F>(config: &grpc::outbound::OutboundPolicy, f: F)
1339where
1340 F: Fn(Option<&grpc::outbound::FailureAccrual>),
1341{
1342 let kind = config
1343 .protocol
1344 .as_ref()
1345 .expect("must have proxy protocol")
1346 .kind
1347 .as_ref()
1348 .expect("must have kind");
1349 if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
1350 opaque: _,
1351 timeout: _,
1352 http1,
1353 http2,
1354 }) = kind
1355 {
1356 let http1 = http1
1357 .as_ref()
1358 .expect("proxy protocol must have http1 field");
1359 let http2 = http2
1360 .as_ref()
1361 .expect("proxy protocol must have http2 field");
1362 f(http1.failure_accrual.as_ref());
1363 f(http2.failure_accrual.as_ref());
1364 } else {
1365 panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
1366 }
1367}
1368
1369#[track_caller]
1370fn failure_accrual_consecutive(
1371 accrual: Option<&grpc::outbound::FailureAccrual>,
1372) -> &grpc::outbound::failure_accrual::ConsecutiveFailures {
1373 assert!(
1374 accrual.is_some(),
1375 "failure accrual must be configured for service"
1376 );
1377 let kind = accrual
1378 .unwrap()
1379 .kind
1380 .as_ref()
1381 .expect("failure accrual must have kind");
1382 let grpc::outbound::failure_accrual::Kind::ConsecutiveFailures(accrual) = kind;
1383 accrual
1384}
1385
1386#[track_caller]
1387fn route_backends_first_available(
1388 route: &grpc::outbound::HttpRoute,
1389) -> &[grpc::outbound::http_route::RouteBackend] {
1390 let kind = assert_singleton(&route.rules)
1391 .backends
1392 .as_ref()
1393 .expect("Rule must have backends")
1394 .kind
1395 .as_ref()
1396 .expect("Backend must have kind");
1397 match kind {
1398 grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends,
1399 _ => panic!("Distribution must be FirstAvailable"),
1400 }
1401}
1402
1403#[track_caller]
1404fn route_backends_random_available(
1405 route: &grpc::outbound::HttpRoute,
1406) -> &[grpc::outbound::http_route::WeightedRouteBackend] {
1407 let kind = assert_singleton(&route.rules)
1408 .backends
1409 .as_ref()
1410 .expect("Rule must have backends")
1411 .kind
1412 .as_ref()
1413 .expect("Backend must have kind");
1414 match kind {
1415 grpc::outbound::http_route::distribution::Kind::RandomAvailable(dist) => &dist.backends,
1416 _ => panic!("Distribution must be RandomAvailable"),
1417 }
1418}
1419
1420#[track_caller]
1421fn route_name(route: &grpc::outbound::HttpRoute) -> &str {
1422 match route.metadata.as_ref().unwrap().kind.as_ref().unwrap() {
1423 grpc::meta::metadata::Kind::Resource(grpc::meta::Resource { ref name, .. }) => name,
1424 _ => panic!("route must be a resource kind"),
1425 }
1426}
1427
1428#[track_caller]
1429fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::WeightedRouteBackend) {
1430 let filter = assert_singleton(&backend.backend.as_ref().unwrap().filters);
1431 match filter.kind.as_ref().unwrap() {
1432 grpc::outbound::http_route::filter::Kind::FailureInjector(_) => {}
1433 _ => panic!("backend must have FailureInjector filter"),
1434 };
1435}
1436
1437#[track_caller]
1438fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) {
1439 let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
1440 match kind {
1441 grpc::meta::metadata::Kind::Default(_) => {}
1442 grpc::meta::metadata::Kind::Resource(r) => {
1443 panic!("route expected to be default but got resource {r:?}")
1444 }
1445 }
1446
1447 let backends = route_backends_first_available(route);
1448 let backend = assert_singleton(backends);
1449 assert_backend_matches_service(backend, svc, port);
1450
1451 let rule = assert_singleton(&route.rules);
1452 let route_match = assert_singleton(&rule.matches);
1453 let path_match = route_match.path.as_ref().unwrap().kind.as_ref().unwrap();
1454 assert_eq!(
1455 *path_match,
1456 grpc::http_route::path_match::Kind::Prefix("/".to_string())
1457 );
1458}
1459
1460#[track_caller]
1461fn assert_backend_matches_service(
1462 backend: &grpc::outbound::http_route::RouteBackend,
1463 svc: &k8s::Service,
1464 port: u16,
1465) {
1466 let backend = backend.backend.as_ref().unwrap();
1467 let dst = match backend.kind.as_ref().unwrap() {
1468 grpc::outbound::backend::Kind::Balancer(balance) => {
1469 let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap();
1470 match kind {
1471 grpc::outbound::backend::endpoint_discovery::Kind::Dst(dst) => &dst.path,
1472 }
1473 }
1474 grpc::outbound::backend::Kind::Forward(_) => {
1475 panic!("default route backend must be Balancer")
1476 }
1477 };
1478 assert_eq!(
1479 *dst,
1480 format!(
1481 "{}.{}.svc.{}:{}",
1482 svc.name_unchecked(),
1483 svc.namespace().unwrap(),
1484 "cluster.local",
1485 port
1486 )
1487 );
1488
1489 assert_svc_meta(&backend.metadata, svc, port)
1490}
1491
1492#[track_caller]
1493fn assert_singleton<T>(ts: &[T]) -> &T {
1494 assert_eq!(ts.len(), 1);
1495 ts.first().unwrap()
1496}
1497
1498#[track_caller]
1499fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) {
1500 let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
1501 match kind {
1502 grpc::meta::metadata::Kind::Default(d) => {
1503 panic!("route expected to not be default, but got default {d:?}")
1504 }
1505 grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name),
1506 }
1507}
View as plain text