1use std::{collections::BTreeMap, time::Duration};
2
3use futures::prelude::*;
4use kube::ResourceExt;
5use linkerd_policy_controller_k8s_api as k8s;
6use linkerd_policy_test::{
7 assert_default_accrual_backoff, assert_svc_meta, create, create_annotated_service,
8 create_cluster_scoped, create_opaque_service, create_service, delete_cluster_scoped, grpc,
9 mk_service, with_temp_ns,
10};
11use maplit::{btreemap, convert_args};
12use tokio::time;
13
14// These tests are copies of the tests in outbound_api_gateway.rs but using the
15// policy.linkerd.io HttpRoute kubernetes types instead of the Gateway API ones.
16// These two files should be kept in sync to ensure that Linkerd can read and
17// function correctly with both types of resources.
18
19#[tokio::test(flavor = "current_thread")]
20async fn service_does_not_exist() {
21 with_temp_ns(|client, ns| async move {
22 // Build a service but don't apply it to the cluster.
23 let mut svc = mk_service(&ns, "my-svc", 4191);
24 // Give it a bogus cluster ip.
25 svc.spec.as_mut().unwrap().cluster_ip = Some("1.1.1.1".to_string());
26
27 let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(&client).await;
28 let rsp = policy_api.watch(&ns, &svc, 4191).await;
29
30 assert!(rsp.is_err());
31 assert_eq!(rsp.err().unwrap().code(), tonic::Code::NotFound);
32 })
33 .await;
34}
35
36#[tokio::test(flavor = "current_thread")]
37async fn service_with_no_http_routes() {
38 with_temp_ns(|client, ns| async move {
39 // Create a service
40 let svc = create_service(&client, &ns, "my-svc", 4191).await;
41
42 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
43 let config = rx
44 .next()
45 .await
46 .expect("watch must not fail")
47 .expect("watch must return an initial config");
48 tracing::trace!(?config);
49
50 assert_svc_meta(&config.metadata, &svc, 4191);
51
52 // There should be a default route.
53 detect_http_routes(&config, |routes| {
54 let route = assert_singleton(routes);
55 assert_route_is_default(route, &svc, 4191);
56 });
57 })
58 .await;
59}
60
61#[tokio::test(flavor = "current_thread")]
62async fn service_with_http_route_without_rules() {
63 with_temp_ns(|client, ns| async move {
64 // Create a service
65 let svc = create_service(&client, &ns, "my-svc", 4191).await;
66
67 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
68 let config = rx
69 .next()
70 .await
71 .expect("watch must not fail")
72 .expect("watch must return an initial config");
73 tracing::trace!(?config);
74
75 assert_svc_meta(&config.metadata, &svc, 4191);
76
77 // There should be a default route.
78 detect_http_routes(&config, |routes| {
79 let route = assert_singleton(routes);
80 assert_route_is_default(route, &svc, 4191);
81 });
82
83 let _route = create(&client, mk_empty_http_route(&ns, "foo-route", &svc, 4191)).await;
84
85 let config = rx
86 .next()
87 .await
88 .expect("watch must not fail")
89 .expect("watch must return an updated config");
90 tracing::trace!(?config);
91
92 assert_svc_meta(&config.metadata, &svc, 4191);
93
94 // There should be a route with no rules.
95 detect_http_routes(&config, |routes| {
96 let route = assert_singleton(routes);
97 assert_eq!(route.rules.len(), 0);
98 });
99 })
100 .await;
101}
102
103#[tokio::test(flavor = "current_thread")]
104async fn service_with_http_routes_without_backends() {
105 with_temp_ns(|client, ns| async move {
106 // Create a service
107 let svc = create_service(&client, &ns, "my-svc", 4191).await;
108
109 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
110 let config = rx
111 .next()
112 .await
113 .expect("watch must not fail")
114 .expect("watch must return an initial config");
115 tracing::trace!(?config);
116
117 assert_svc_meta(&config.metadata, &svc, 4191);
118
119 // There should be a default route.
120 detect_http_routes(&config, |routes| {
121 let route = assert_singleton(routes);
122 assert_route_is_default(route, &svc, 4191);
123 });
124
125 let _route = create(
126 &client,
127 mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
128 )
129 .await;
130
131 let config = rx
132 .next()
133 .await
134 .expect("watch must not fail")
135 .expect("watch must return an updated config");
136 tracing::trace!(?config);
137
138 assert_svc_meta(&config.metadata, &svc, 4191);
139
140 // There should be a route with the logical backend.
141 detect_http_routes(&config, |routes| {
142 let route = assert_singleton(routes);
143 let backends = route_backends_first_available(route);
144 let backend = assert_singleton(backends);
145 assert_backend_matches_service(backend, &svc, 4191);
146 });
147 })
148 .await;
149}
150
151#[tokio::test(flavor = "current_thread")]
152async fn service_with_http_routes_with_backend() {
153 with_temp_ns(|client, ns| async move {
154 // Create a service
155 let svc = create_service(&client, &ns, "my-svc", 4191).await;
156
157 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
158 let config = rx
159 .next()
160 .await
161 .expect("watch must not fail")
162 .expect("watch must return an initial config");
163 tracing::trace!(?config);
164
165 assert_svc_meta(&config.metadata, &svc, 4191);
166
167 // There should be a default route.
168 detect_http_routes(&config, |routes| {
169 let route = assert_singleton(routes);
170 assert_route_is_default(route, &svc, 4191);
171 });
172
173 let backend_name = "backend";
174 let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
175 let backends = [backend_name];
176 let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
177 Some(&backends),
178 None,
179 None,
180 );
181 let _route = create(&client, route.build()).await;
182
183 let config = rx
184 .next()
185 .await
186 .expect("watch must not fail")
187 .expect("watch must return an updated config");
188 tracing::trace!(?config);
189
190 assert_svc_meta(&config.metadata, &svc, 4191);
191
192 // There should be a route with a backend with no filters.
193 detect_http_routes(&config, |routes| {
194 let route = assert_singleton(routes);
195 let backends = route_backends_random_available(route);
196 let backend = assert_singleton(backends);
197 assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
198 let filters = &backend.backend.as_ref().unwrap().filters;
199 assert_eq!(filters.len(), 0);
200 });
201 })
202 .await;
203}
204
205#[tokio::test(flavor = "current_thread")]
206async fn service_with_http_routes_with_cross_namespace_backend() {
207 with_temp_ns(|client, ns| async move {
208 // Create a service
209 let svc = create_service(&client, &ns, "my-svc", 4191).await;
210
211 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
212 let config = rx
213 .next()
214 .await
215 .expect("watch must not fail")
216 .expect("watch must return an initial config");
217 tracing::trace!(?config);
218
219 assert_svc_meta(&config.metadata, &svc, 4191);
220
221 // There should be a default route.
222 detect_http_routes(&config, |routes| {
223 let route = assert_singleton(routes);
224 assert_route_is_default(route, &svc, 4191);
225 });
226
227 let backend_ns_name = format!("{}-backend", ns);
228 let backend_ns = create_cluster_scoped(
229 &client,
230 k8s::Namespace {
231 metadata: k8s::ObjectMeta {
232 name: Some(backend_ns_name.clone()),
233 labels: Some(convert_args!(btreemap!(
234 "linkerd-policy-test" => std::thread::current().name().unwrap_or(""),
235 ))),
236 ..Default::default()
237 },
238 ..Default::default()
239 },
240 )
241 .await;
242 let backend_name = "backend";
243 let backend_svc = create_service(&client, &backend_ns_name, backend_name, 8888).await;
244 let backends = [backend_name];
245 let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
246 Some(&backends),
247 Some(backend_ns_name),
248 None,
249 );
250 let _route = create(&client, route.build()).await;
251
252 let config = rx
253 .next()
254 .await
255 .expect("watch must not fail")
256 .expect("watch must return an updated config");
257 tracing::trace!(?config);
258
259 assert_svc_meta(&config.metadata, &svc, 4191);
260
261 // There should be a route with a backend with no filters.
262 detect_http_routes(&config, |routes| {
263 let route = assert_singleton(routes);
264 let backends = route_backends_random_available(route);
265 let backend = assert_singleton(backends);
266 assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
267 let filters = &backend.backend.as_ref().unwrap().filters;
268 assert_eq!(filters.len(), 0);
269 });
270
271 delete_cluster_scoped(&client, backend_ns).await
272 })
273 .await;
274}
275
276// TODO: Test fails until handling of invalid backends is implemented.
277#[tokio::test(flavor = "current_thread")]
278async fn service_with_http_routes_with_invalid_backend() {
279 with_temp_ns(|client, ns| async move {
280 // Create a service
281 let svc = create_service(&client, &ns, "my-svc", 4191).await;
282
283 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
284 let config = rx
285 .next()
286 .await
287 .expect("watch must not fail")
288 .expect("watch must return an initial config");
289 tracing::trace!(?config);
290
291 assert_svc_meta(&config.metadata, &svc, 4191);
292
293 // There should be a default route.
294 detect_http_routes(&config, |routes| {
295 let route = assert_singleton(routes);
296 assert_route_is_default(route, &svc, 4191);
297 });
298
299 let backends = ["invalid-backend"];
300 let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
301 Some(&backends),
302 None,
303 None,
304 );
305 let _route = create(&client, route.build()).await;
306
307 let config = rx
308 .next()
309 .await
310 .expect("watch must not fail")
311 .expect("watch must return an updated config");
312 tracing::trace!(?config);
313
314 assert_svc_meta(&config.metadata, &svc, 4191);
315
316 // There should be a route with a backend.
317 detect_http_routes(&config, |routes| {
318 let route = assert_singleton(routes);
319 let backends = route_backends_random_available(route);
320 let backend = assert_singleton(backends);
321 assert_backend_has_failure_filter(backend);
322 });
323 })
324 .await;
325}
326
327// TODO: Investigate why the policy controller is only returning one route in this
328// case instead of two.
329#[tokio::test(flavor = "current_thread")]
330async fn service_with_multiple_http_routes() {
331 with_temp_ns(|client, ns| async move {
332 // Create a service
333 let svc = create_service(&client, &ns, "my-svc", 4191).await;
334
335 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
336 let config = rx
337 .next()
338 .await
339 .expect("watch must not fail")
340 .expect("watch must return an initial config");
341 tracing::trace!(?config);
342
343 assert_svc_meta(&config.metadata, &svc, 4191);
344
345 // There should be a default route.
346 detect_http_routes(&config, |routes| {
347 let route = assert_singleton(routes);
348 assert_route_is_default(route, &svc, 4191);
349 });
350
351 // Routes should be returned in sorted order by creation timestamp then
352 // name. To ensure that this test isn't timing dependant, routes should
353 // be created in alphabetical order.
354 let _a_route = create(
355 &client,
356 mk_http_route(&ns, "a-route", &svc, Some(4191)).build(),
357 )
358 .await;
359
360 // First route update.
361 let config = rx
362 .next()
363 .await
364 .expect("watch must not fail")
365 .expect("watch must return an updated config");
366 tracing::trace!(?config);
367
368 assert_svc_meta(&config.metadata, &svc, 4191);
369
370 let _b_route = create(
371 &client,
372 mk_http_route(&ns, "b-route", &svc, Some(4191)).build(),
373 )
374 .await;
375
376 // Second route update.
377 let config = rx
378 .next()
379 .await
380 .expect("watch must not fail")
381 .expect("watch must return an updated config");
382 tracing::trace!(?config);
383
384 assert_svc_meta(&config.metadata, &svc, 4191);
385
386 // There should be 2 routes, returned in order.
387 detect_http_routes(&config, |routes| {
388 assert_eq!(routes.len(), 2);
389 assert_eq!(route_name(&routes[0]), "a-route");
390 assert_eq!(route_name(&routes[1]), "b-route");
391 });
392 })
393 .await;
394}
395
396#[tokio::test(flavor = "current_thread")]
397async fn service_with_consecutive_failure_accrual() {
398 with_temp_ns(|client, ns| async move {
399 let svc = create_annotated_service(
400 &client,
401 &ns,
402 "consecutive-accrual-svc",
403 80,
404 BTreeMap::from([
405 (
406 "balancer.linkerd.io/failure-accrual".to_string(),
407 "consecutive".to_string(),
408 ),
409 (
410 "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
411 "8".to_string(),
412 ),
413 (
414 "balancer.linkerd.io/failure-accrual-consecutive-min-penalty".to_string(),
415 "10s".to_string(),
416 ),
417 (
418 "balancer.linkerd.io/failure-accrual-consecutive-max-penalty".to_string(),
419 "10m".to_string(),
420 ),
421 (
422 "balancer.linkerd.io/failure-accrual-consecutive-jitter-ratio".to_string(),
423 "1.0".to_string(),
424 ),
425 ]),
426 )
427 .await;
428
429 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
430 let config = rx
431 .next()
432 .await
433 .expect("watch must not fail")
434 .expect("watch must return an initial config");
435 tracing::trace!(?config);
436
437 assert_svc_meta(&config.metadata, &svc, 4191);
438
439 detect_failure_accrual(&config, |accrual| {
440 let consecutive = failure_accrual_consecutive(accrual);
441 assert_eq!(8, consecutive.max_failures);
442 assert_eq!(
443 &grpc::outbound::ExponentialBackoff {
444 min_backoff: Some(Duration::from_secs(10).try_into().unwrap()),
445 max_backoff: Some(Duration::from_secs(600).try_into().unwrap()),
446 jitter_ratio: 1.0_f32,
447 },
448 consecutive
449 .backoff
450 .as_ref()
451 .expect("backoff must be configured")
452 );
453 });
454 })
455 .await;
456}
457
458#[tokio::test(flavor = "current_thread")]
459async fn service_with_consecutive_failure_accrual_defaults() {
460 with_temp_ns(|client, ns| async move {
461 // Create a service configured to do consecutive failure accrual, but
462 // with no additional configuration
463 let svc = create_annotated_service(
464 &client,
465 &ns,
466 "default-accrual-svc",
467 80,
468 BTreeMap::from([(
469 "balancer.linkerd.io/failure-accrual".to_string(),
470 "consecutive".to_string(),
471 )]),
472 )
473 .await;
474
475 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
476 let config = rx
477 .next()
478 .await
479 .expect("watch must not fail")
480 .expect("watch must return an initial config");
481 tracing::trace!(?config);
482
483 assert_svc_meta(&config.metadata, &svc, 4191);
484
485 // Expect default max_failures and default backoff
486 detect_failure_accrual(&config, |accrual| {
487 let consecutive = failure_accrual_consecutive(accrual);
488 assert_eq!(7, consecutive.max_failures);
489 assert_default_accrual_backoff!(consecutive
490 .backoff
491 .as_ref()
492 .expect("backoff must be configured"));
493 });
494
495 // Create a service configured to do consecutive failure accrual with
496 // max number of failures and with default backoff
497 let svc = create_annotated_service(
498 &client,
499 &ns,
500 "no-backoff-svc",
501 80,
502 BTreeMap::from([
503 (
504 "balancer.linkerd.io/failure-accrual".to_string(),
505 "consecutive".to_string(),
506 ),
507 (
508 "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
509 "8".to_string(),
510 ),
511 ]),
512 )
513 .await;
514
515 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
516 let config = rx
517 .next()
518 .await
519 .expect("watch must not fail")
520 .expect("watch must return an initial config");
521 tracing::trace!(?config);
522
523 // Expect default backoff and overridden max_failures
524 detect_failure_accrual(&config, |accrual| {
525 let consecutive = failure_accrual_consecutive(accrual);
526 assert_eq!(8, consecutive.max_failures);
527 assert_default_accrual_backoff!(consecutive
528 .backoff
529 .as_ref()
530 .expect("backoff must be configured"));
531 });
532
533 // Create a service configured to do consecutive failure accrual with
534 // only the jitter ratio configured in the backoff
535 let svc = create_annotated_service(
536 &client,
537 &ns,
538 "only-jitter-svc",
539 80,
540 BTreeMap::from([
541 (
542 "balancer.linkerd.io/failure-accrual".to_string(),
543 "consecutive".to_string(),
544 ),
545 (
546 "balancer.linkerd.io/failure-accrual-consecutive-jitter-ratio".to_string(),
547 "1.0".to_string(),
548 ),
549 ]),
550 )
551 .await;
552
553 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
554 let config = rx
555 .next()
556 .await
557 .expect("watch must not fail")
558 .expect("watch must return an initial config");
559 tracing::trace!(?config);
560
561 // Expect defaults for everything except for the jitter ratio
562 detect_failure_accrual(&config, |accrual| {
563 let consecutive = failure_accrual_consecutive(accrual);
564 assert_eq!(7, consecutive.max_failures);
565 assert_eq!(
566 &grpc::outbound::ExponentialBackoff {
567 min_backoff: Some(Duration::from_secs(1).try_into().unwrap()),
568 max_backoff: Some(Duration::from_secs(60).try_into().unwrap()),
569 jitter_ratio: 1.0_f32,
570 },
571 consecutive
572 .backoff
573 .as_ref()
574 .expect("backoff must be configured")
575 );
576 });
577 })
578 .await;
579}
580
581#[tokio::test(flavor = "current_thread")]
582async fn service_with_default_failure_accrual() {
583 with_temp_ns(|client, ns| async move {
584 // Default config for Service, no failure accrual
585 let svc = create_service(&client, &ns, "default-failure-accrual", 80).await;
586
587 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
588 let config = rx
589 .next()
590 .await
591 .expect("watch must not fail")
592 .expect("watch must return an initial config");
593 tracing::trace!(?config);
594
595 // Expect failure accrual config to be default (no failure accrual)
596 detect_failure_accrual(&config, |accrual| {
597 assert!(
598 accrual.is_none(),
599 "consecutive failure accrual should not be configured for service"
600 );
601 });
602
603 // Create Service with consecutive failure accrual config for
604 // max_failures but no mode
605 let svc = create_annotated_service(
606 &client,
607 &ns,
608 "default-max-failure-svc",
609 80,
610 BTreeMap::from([(
611 "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
612 "8".to_string(),
613 )]),
614 )
615 .await;
616
617 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
618 let config = rx
619 .next()
620 .await
621 .expect("watch must not fail")
622 .expect("watch must return an initial config");
623 tracing::trace!(?config);
624
625 // Expect failure accrual config to be default (no failure accrual)
626 detect_failure_accrual(&config, |accrual| {
627 assert!(
628 accrual.is_none(),
629 "consecutive failure accrual should not be configured for service"
630 )
631 });
632 })
633 .await;
634}
635
636#[tokio::test(flavor = "current_thread")]
637async fn opaque_service() {
638 with_temp_ns(|client, ns| async move {
639 // Create a service
640 let svc = create_opaque_service(&client, &ns, "my-svc", 4191).await;
641
642 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
643 let config = rx
644 .next()
645 .await
646 .expect("watch must not fail")
647 .expect("watch must return an initial config");
648 tracing::trace!(?config);
649
650 assert_svc_meta(&config.metadata, &svc, 4191);
651
652 // Proxy protocol should be opaque.
653 match config.protocol.unwrap().kind.unwrap() {
654 grpc::outbound::proxy_protocol::Kind::Opaque(_) => {}
655 _ => panic!("proxy protocol must be Opaque"),
656 };
657 })
658 .await;
659}
660
661#[tokio::test(flavor = "current_thread")]
662async fn route_rule_with_filters() {
663 with_temp_ns(|client, ns| async move {
664 // Create a service
665 let svc = create_service(&client, &ns, "my-svc", 4191).await;
666
667 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
668 let config = rx
669 .next()
670 .await
671 .expect("watch must not fail")
672 .expect("watch must return an initial config");
673 tracing::trace!(?config);
674
675 // There should be a default route.
676 detect_http_routes(&config, |routes| {
677 let route = assert_singleton(routes);
678 assert_route_is_default(route, &svc, 4191);
679 });
680
681 let backend_name = "backend";
682 let backends = [backend_name];
683 let route = mk_http_route(
684 &ns,
685 "foo-route",
686 &svc,
687 Some(4191),
688 ).with_backends(Some(&backends), None, None).with_filters(Some(vec![
689 k8s::policy::httproute::HttpRouteFilter::RequestHeaderModifier {
690 request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter {
691 set: Some(vec![k8s_gateway_api::HttpHeader {
692 name: "set".to_string(),
693 value: "set-value".to_string(),
694 }]),
695 add: Some(vec![k8s_gateway_api::HttpHeader {
696 name: "add".to_string(),
697 value: "add-value".to_string(),
698 }]),
699 remove: Some(vec!["remove".to_string()]),
700 },
701 },
702 k8s::policy::httproute::HttpRouteFilter::RequestRedirect {
703 request_redirect: k8s_gateway_api::HttpRequestRedirectFilter {
704 scheme: Some("http".to_string()),
705 hostname: Some("host".to_string()),
706 path: Some(k8s_gateway_api::HttpPathModifier::ReplacePrefixMatch {
707 replace_prefix_match: "/path".to_string(),
708 }),
709 port: Some(5555),
710 status_code: Some(302),
711 },
712 },
713 ]));
714 let _route = create(
715 &client,
716 route.build(),
717 )
718 .await;
719
720 let config = rx
721 .next()
722 .await
723 .expect("watch must not fail")
724 .expect("watch must return an updated config");
725 tracing::trace!(?config);
726
727 // There should be a route with filters.
728 detect_http_routes(&config, |routes| {
729 let route = assert_singleton(routes);
730 let rule = assert_singleton(&route.rules);
731 let filters = &rule.filters;
732 assert_eq!(
733 *filters,
734 vec![
735 grpc::outbound::http_route::Filter {
736 kind: Some(
737 grpc::outbound::http_route::filter::Kind::RequestHeaderModifier(
738 grpc::http_route::RequestHeaderModifier {
739 add: Some(grpc::http_types::Headers {
740 headers: vec![grpc::http_types::headers::Header {
741 name: "add".to_string(),
742 value: "add-value".into(),
743 }]
744 }),
745 set: Some(grpc::http_types::Headers {
746 headers: vec![grpc::http_types::headers::Header {
747 name: "set".to_string(),
748 value: "set-value".into(),
749 }]
750 }),
751 remove: vec!["remove".to_string()],
752 }
753 )
754 )
755 },
756 grpc::outbound::http_route::Filter {
757 kind: Some(grpc::outbound::http_route::filter::Kind::Redirect(
758 grpc::http_route::RequestRedirect {
759 scheme: Some(grpc::http_types::Scheme {
760 r#type: Some(grpc::http_types::scheme::Type::Registered(
761 grpc::http_types::scheme::Registered::Http.into(),
762 ))
763 }),
764 host: "host".to_string(),
765 path: Some(linkerd2_proxy_api::http_route::PathModifier { replace: Some(linkerd2_proxy_api::http_route::path_modifier::Replace::Prefix("/path".to_string())) }),
766 port: 5555,
767 status: 302,
768 }
769 ))
770 }
771 ]
772 );
773 });
774 })
775 .await;
776}
777
778#[tokio::test(flavor = "current_thread")]
779async fn backend_with_filters() {
780 with_temp_ns(|client, ns| async move {
781 // Create a service
782 let svc = create_service(&client, &ns, "my-svc", 4191).await;
783
784 let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
785 let config = rx
786 .next()
787 .await
788 .expect("watch must not fail")
789 .expect("watch must return an initial config");
790 tracing::trace!(?config);
791
792 assert_svc_meta(&config.metadata, &svc, 4191);
793
794 // There should be a default route.
795 detect_http_routes(&config, |routes| {
796 let route = assert_singleton(routes);
797 assert_route_is_default(route, &svc, 4191);
798 });
799
800 let backend_name = "backend";
801 let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
802 let backends = [backend_name];
803 let route = mk_http_route(
804 &ns,
805 "foo-route",
806 &svc,
807 Some(4191)
808 ).with_backends(Some(&backends), None, Some(vec![
809 k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier {
810 request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter {
811 set: Some(vec![k8s_gateway_api::HttpHeader {
812 name: "set".to_string(),
813 value: "set-value".to_string(),
814 }]),
815 add: Some(vec![k8s_gateway_api::HttpHeader {
816 name: "add".to_string(),
817 value: "add-value".to_string(),
818 }]),
819 remove: Some(vec!["remove".to_string()]),
820 },
821 },
822 k8s_gateway_api::HttpRouteFilter::RequestRedirect {
823 request_redirect: k8s_gateway_api::HttpRequestRedirectFilter {
824 scheme: Some("http".to_string()),
825 hostname: Some("host".to_string()),
826 path: Some(k8s_gateway_api::HttpPathModifier::ReplacePrefixMatch {
827 replace_prefix_match: "/path".to_string(),
828 }),
829 port: Some(5555),
830 status_code: Some(302),
831 },
832 },
833 ]));
834 let _route = create(&client, route.build())
835 .await;
836
837 let config = rx
838 .next()
839 .await
840 .expect("watch must not fail")
841 .expect("watch must return an updated config");
842 tracing::trace!(?config);
843
844 assert_svc_meta(&config.metadata, &svc, 4191);
845
846 // There should be a route without rule filters.
847 detect_http_routes(&config, |routes| {
848 let route = assert_singleton(routes);
849 let rule = assert_singleton(&route.rules);
850 assert_eq!(rule.filters.len(), 0);
851 let backends = route_backends_random_available(route);
852 let backend = assert_singleton(backends);
853 assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
854 let filters = &backend.backend.as_ref().unwrap().filters;
855 assert_eq!(
856 *filters,
857 vec![
858 grpc::outbound::http_route::Filter {
859 kind: Some(
860 grpc::outbound::http_route::filter::Kind::RequestHeaderModifier(
861 grpc::http_route::RequestHeaderModifier {
862 add: Some(grpc::http_types::Headers {
863 headers: vec![grpc::http_types::headers::Header {
864 name: "add".to_string(),
865 value: "add-value".into(),
866 }]
867 }),
868 set: Some(grpc::http_types::Headers {
869 headers: vec![grpc::http_types::headers::Header {
870 name: "set".to_string(),
871 value: "set-value".into(),
872 }]
873 }),
874 remove: vec!["remove".to_string()],
875 }
876 )
877 )
878 },
879 grpc::outbound::http_route::Filter {
880 kind: Some(grpc::outbound::http_route::filter::Kind::Redirect(
881 grpc::http_route::RequestRedirect {
882 scheme: Some(grpc::http_types::Scheme {
883 r#type: Some(grpc::http_types::scheme::Type::Registered(
884 grpc::http_types::scheme::Registered::Http.into(),
885 ))
886 }),
887 host: "host".to_string(),
888 path: Some(linkerd2_proxy_api::http_route::PathModifier { replace: Some(linkerd2_proxy_api::http_route::path_modifier::Replace::Prefix("/path".to_string())) }),
889 port: 5555,
890 status: 302,
891 }
892 ))
893 }
894 ]
895 );
896 });
897 })
898 .await;
899}
900
901#[tokio::test(flavor = "current_thread")]
902async fn http_route_with_no_port() {
903 with_temp_ns(|client, ns| async move {
904 // Create a service
905 let svc = create_service(&client, &ns, "my-svc", 4191).await;
906
907 let mut rx_4191 = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
908 let config_4191 = rx_4191
909 .next()
910 .await
911 .expect("watch must not fail")
912 .expect("watch must return an initial config");
913 tracing::trace!(?config_4191);
914
915 assert_svc_meta(&config_4191.metadata, &svc, 4191);
916
917 let mut rx_9999 = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
918 let config_9999 = rx_9999
919 .next()
920 .await
921 .expect("watch must not fail")
922 .expect("watch must return an initial config");
923 tracing::trace!(?config_9999);
924
925 assert_svc_meta(&config_9999.metadata, &svc, 9999);
926
927 // There should be a default route.
928 detect_http_routes(&config_4191, |routes| {
929 let route = assert_singleton(routes);
930 assert_route_is_default(route, &svc, 4191);
931 });
932 detect_http_routes(&config_9999, |routes| {
933 let route = assert_singleton(routes);
934 assert_route_is_default(route, &svc, 9999);
935 });
936
937 let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await;
938
939 let config_4191 = rx_4191
940 .next()
941 .await
942 .expect("watch must not fail")
943 .expect("watch must return an updated config");
944 tracing::trace!(?config_4191);
945
946 // The route should apply to the service.
947 detect_http_routes(&config_4191, |routes| {
948 let route = assert_singleton(routes);
949 assert_route_name_eq(route, "foo-route");
950 });
951
952 let config_9999 = rx_9999
953 .next()
954 .await
955 .expect("watch must not fail")
956 .expect("watch must return an updated config");
957 tracing::trace!(?config_9999);
958
959 // The route should apply to other ports too.
960 detect_http_routes(&config_9999, |routes| {
961 let route = assert_singleton(routes);
962 assert_route_name_eq(route, "foo-route");
963 });
964 })
965 .await;
966}
967
968#[tokio::test(flavor = "current_thread")]
969async fn producer_route() {
970 with_temp_ns(|client, ns| async move {
971 // Create a service
972 let svc = create_service(&client, &ns, "my-svc", 4191).await;
973
974 let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
975 let producer_config = producer_rx
976 .next()
977 .await
978 .expect("watch must not fail")
979 .expect("watch must return an initial config");
980 tracing::trace!(?producer_config);
981
982 let mut consumer_rx = retry_watch_outbound_policy(&client, "consumer_ns", &svc, 4191).await;
983 let consumer_config = consumer_rx
984 .next()
985 .await
986 .expect("watch must not fail")
987 .expect("watch must return an initial config");
988 tracing::trace!(?consumer_config);
989
990 // There should be a default route.
991 detect_http_routes(&producer_config, |routes| {
992 let route = assert_singleton(routes);
993 assert_route_is_default(route, &svc, 4191);
994 });
995 detect_http_routes(&consumer_config, |routes| {
996 let route = assert_singleton(routes);
997 assert_route_is_default(route, &svc, 4191);
998 });
999
1000 // A route created in the same namespace as its parent service is called
1001 // a producer route. It should be returned in outbound policy requests
1002 // for that service from ALL namespaces.
1003 let _route = create(
1004 &client,
1005 mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
1006 )
1007 .await;
1008
1009 let producer_config = producer_rx
1010 .next()
1011 .await
1012 .expect("watch must not fail")
1013 .expect("watch must return an updated config");
1014 tracing::trace!(?producer_config);
1015 let consumer_config = consumer_rx
1016 .next()
1017 .await
1018 .expect("watch must not fail")
1019 .expect("watch must return an initial config");
1020 tracing::trace!(?consumer_config);
1021
1022 // The route should be returned in queries from the producer namespace.
1023 detect_http_routes(&producer_config, |routes| {
1024 let route = assert_singleton(routes);
1025 assert_route_name_eq(route, "foo-route");
1026 });
1027
1028 // The route should be returned in queries from a consumer namespace.
1029 detect_http_routes(&consumer_config, |routes| {
1030 let route = assert_singleton(routes);
1031 assert_route_name_eq(route, "foo-route");
1032 });
1033 })
1034 .await;
1035}
1036
1037#[tokio::test(flavor = "current_thread")]
1038async fn pre_existing_producer_route() {
1039 // We test the scenario where outbound policy watches are initiated after
1040 // a produce route already exists.
1041 with_temp_ns(|client, ns| async move {
1042 // Create a service
1043 let svc = create_service(&client, &ns, "my-svc", 4191).await;
1044
1045 // A route created in the same namespace as its parent service is called
1046 // a producer route. It should be returned in outbound policy requests
1047 // for that service from ALL namespaces.
1048 let _route = create(
1049 &client,
1050 mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
1051 )
1052 .await;
1053
1054 let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
1055 let producer_config = producer_rx
1056 .next()
1057 .await
1058 .expect("watch must not fail")
1059 .expect("watch must return an initial config");
1060 tracing::trace!(?producer_config);
1061
1062 let mut consumer_rx = retry_watch_outbound_policy(&client, "consumer_ns", &svc, 4191).await;
1063 let consumer_config = consumer_rx
1064 .next()
1065 .await
1066 .expect("watch must not fail")
1067 .expect("watch must return an initial config");
1068 tracing::trace!(?consumer_config);
1069
1070 // The route should be returned in queries from the producer namespace.
1071 detect_http_routes(&producer_config, |routes| {
1072 let route = assert_singleton(routes);
1073 assert_route_name_eq(route, "foo-route");
1074 });
1075
1076 // The route should be returned in queries from a consumer namespace.
1077 detect_http_routes(&consumer_config, |routes| {
1078 let route = assert_singleton(routes);
1079 assert_route_name_eq(route, "foo-route");
1080 });
1081 })
1082 .await;
1083}
1084
1085#[tokio::test(flavor = "current_thread")]
1086async fn consumer_route() {
1087 with_temp_ns(|client, ns| async move {
1088 // Create a service
1089 let svc = create_service(&client, &ns, "my-svc", 4191).await;
1090
1091 let consumer_ns_name = format!("{}-consumer", ns);
1092 let consumer_ns = create_cluster_scoped(
1093 &client,
1094 k8s::Namespace {
1095 metadata: k8s::ObjectMeta {
1096 name: Some(consumer_ns_name.clone()),
1097 labels: Some(convert_args!(btreemap!(
1098 "linkerd-policy-test" => std::thread::current().name().unwrap_or(""),
1099 ))),
1100 ..Default::default()
1101 },
1102 ..Default::default()
1103 },
1104 )
1105 .await;
1106
1107 let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
1108 let producer_config = producer_rx
1109 .next()
1110 .await
1111 .expect("watch must not fail")
1112 .expect("watch must return an initial config");
1113 tracing::trace!(?producer_config);
1114
1115 let mut consumer_rx =
1116 retry_watch_outbound_policy(&client, &consumer_ns_name, &svc, 4191).await;
1117 let consumer_config = consumer_rx
1118 .next()
1119 .await
1120 .expect("watch must not fail")
1121 .expect("watch must return an initial config");
1122 tracing::trace!(?consumer_config);
1123
1124 let mut other_rx = retry_watch_outbound_policy(&client, "other_ns", &svc, 4191).await;
1125 let other_config = other_rx
1126 .next()
1127 .await
1128 .expect("watch must not fail")
1129 .expect("watch must return an initial config");
1130 tracing::trace!(?other_config);
1131
1132 // There should be a default route.
1133 detect_http_routes(&producer_config, |routes| {
1134 let route = assert_singleton(routes);
1135 assert_route_is_default(route, &svc, 4191);
1136 });
1137 detect_http_routes(&consumer_config, |routes| {
1138 let route = assert_singleton(routes);
1139 assert_route_is_default(route, &svc, 4191);
1140 });
1141 detect_http_routes(&other_config, |routes| {
1142 let route = assert_singleton(routes);
1143 assert_route_is_default(route, &svc, 4191);
1144 });
1145
1146 // A route created in a different namespace as its parent service is
1147 // called a consumer route. It should be returned in outbound policy
1148 // requests for that service ONLY when the request comes from the
1149 // consumer namespace.
1150 let _route = create(
1151 &client,
1152 mk_http_route(&consumer_ns_name, "foo-route", &svc, Some(4191)).build(),
1153 )
1154 .await;
1155
1156 // The route should NOT be returned in queries from the producer namespace.
1157 // There should be a default route.
1158 assert!(producer_rx.next().now_or_never().is_none());
1159
1160 // The route should be returned in queries from the same consumer
1161 // namespace.
1162 let consumer_config = consumer_rx
1163 .next()
1164 .await
1165 .expect("watch must not fail")
1166 .expect("watch must return an initial config");
1167 tracing::trace!(?consumer_config);
1168 detect_http_routes(&consumer_config, |routes| {
1169 let route = assert_singleton(routes);
1170 assert_route_name_eq(route, "foo-route");
1171 });
1172
1173 // The route should NOT be returned in queries from a different consumer
1174 // namespace.
1175 assert!(other_rx.next().now_or_never().is_none());
1176
1177 delete_cluster_scoped(&client, consumer_ns).await;
1178 })
1179 .await;
1180}
1181
1182/* Helpers */
1183
1184struct HttpRouteBuilder(k8s::policy::HttpRoute);
1185
1186async fn retry_watch_outbound_policy(
1187 client: &kube::Client,
1188 ns: &str,
1189 svc: &k8s::Service,
1190 port: u16,
1191) -> tonic::Streaming<grpc::outbound::OutboundPolicy> {
1192 // Port-forward to the control plane and start watching the service's
1193 // outbound policy.
1194 let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await;
1195 loop {
1196 match policy_api.watch(ns, svc, port).await {
1197 Ok(rx) => return rx,
1198 Err(error) => {
1199 tracing::error!(
1200 ?error,
1201 ns,
1202 svc = svc.name_unchecked(),
1203 "failed to watch outbound policy for port 4191"
1204 );
1205 time::sleep(Duration::from_secs(1)).await;
1206 }
1207 }
1208 }
1209}
1210
1211fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option<u16>) -> HttpRouteBuilder {
1212 use k8s::policy::httproute as api;
1213
1214 HttpRouteBuilder(api::HttpRoute {
1215 metadata: kube::api::ObjectMeta {
1216 namespace: Some(ns.to_string()),
1217 name: Some(name.to_string()),
1218 ..Default::default()
1219 },
1220 spec: api::HttpRouteSpec {
1221 inner: api::CommonRouteSpec {
1222 parent_refs: Some(vec![api::ParentReference {
1223 group: Some("core".to_string()),
1224 kind: Some("Service".to_string()),
1225 namespace: svc.namespace(),
1226 name: svc.name_unchecked(),
1227 section_name: None,
1228 port,
1229 }]),
1230 },
1231 hostnames: None,
1232 rules: Some(vec![api::HttpRouteRule {
1233 matches: Some(vec![api::HttpRouteMatch {
1234 path: Some(api::HttpPathMatch::Exact {
1235 value: "/foo".to_string(),
1236 }),
1237 headers: None,
1238 query_params: None,
1239 method: Some("GET".to_string()),
1240 }]),
1241 filters: None,
1242 backend_refs: None,
1243 timeouts: None,
1244 }]),
1245 },
1246 status: None,
1247 })
1248}
1249
1250impl HttpRouteBuilder {
1251 fn with_backends(
1252 self,
1253 backends: Option<&[&str]>,
1254 backends_ns: Option<String>,
1255 backend_filters: Option<Vec<k8s_gateway_api::HttpRouteFilter>>,
1256 ) -> Self {
1257 let mut route = self.0;
1258 let backend_refs = backends.map(|names| {
1259 names
1260 .iter()
1261 .map(|name| k8s::policy::httproute::HttpBackendRef {
1262 backend_ref: Some(k8s_gateway_api::BackendRef {
1263 weight: None,
1264 inner: k8s_gateway_api::BackendObjectReference {
1265 name: name.to_string(),
1266 port: Some(8888),
1267 group: None,
1268 kind: None,
1269 namespace: backends_ns.clone(),
1270 },
1271 }),
1272 filters: backend_filters.clone(),
1273 })
1274 .collect()
1275 });
1276 route.spec.rules.iter_mut().flatten().for_each(|rule| {
1277 rule.backend_refs = backend_refs.clone();
1278 });
1279 Self(route)
1280 }
1281
1282 fn with_filters(self, filters: Option<Vec<k8s::policy::httproute::HttpRouteFilter>>) -> Self {
1283 let mut route = self.0;
1284 route
1285 .spec
1286 .rules
1287 .iter_mut()
1288 .flatten()
1289 .for_each(|rule| rule.filters = filters.clone());
1290 Self(route)
1291 }
1292
1293 fn build(self) -> k8s::policy::HttpRoute {
1294 self.0
1295 }
1296}
1297
1298fn mk_empty_http_route(
1299 ns: &str,
1300 name: &str,
1301 svc: &k8s::Service,
1302 port: u16,
1303) -> k8s::policy::HttpRoute {
1304 use k8s::policy::httproute as api;
1305 api::HttpRoute {
1306 metadata: kube::api::ObjectMeta {
1307 namespace: Some(ns.to_string()),
1308 name: Some(name.to_string()),
1309 ..Default::default()
1310 },
1311 spec: api::HttpRouteSpec {
1312 inner: api::CommonRouteSpec {
1313 parent_refs: Some(vec![api::ParentReference {
1314 group: Some("core".to_string()),
1315 kind: Some("Service".to_string()),
1316 namespace: svc.namespace(),
1317 name: svc.name_unchecked(),
1318 section_name: None,
1319 port: Some(port),
1320 }]),
1321 },
1322 hostnames: None,
1323 rules: Some(vec![]),
1324 },
1325 status: None,
1326 }
1327}
1328
1329// detect_http_routes asserts that the given outbound policy has a proxy protcol
1330// of "Detect" and then invokes the given function with the Http1 and Http2
1331// routes from the Detect.
1332#[track_caller]
1333fn detect_http_routes<F>(config: &grpc::outbound::OutboundPolicy, f: F)
1334where
1335 F: Fn(&[grpc::outbound::HttpRoute]),
1336{
1337 let kind = config
1338 .protocol
1339 .as_ref()
1340 .expect("must have proxy protocol")
1341 .kind
1342 .as_ref()
1343 .expect("must have kind");
1344 if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
1345 opaque: _,
1346 timeout: _,
1347 http1,
1348 http2,
1349 }) = kind
1350 {
1351 let http1 = http1
1352 .as_ref()
1353 .expect("proxy protocol must have http1 field");
1354 let http2 = http2
1355 .as_ref()
1356 .expect("proxy protocol must have http2 field");
1357 f(&http1.routes);
1358 f(&http2.routes);
1359 } else {
1360 panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
1361 }
1362}
1363
1364#[track_caller]
1365fn detect_failure_accrual<F>(config: &grpc::outbound::OutboundPolicy, f: F)
1366where
1367 F: Fn(Option<&grpc::outbound::FailureAccrual>),
1368{
1369 let kind = config
1370 .protocol
1371 .as_ref()
1372 .expect("must have proxy protocol")
1373 .kind
1374 .as_ref()
1375 .expect("must have kind");
1376 if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
1377 opaque: _,
1378 timeout: _,
1379 http1,
1380 http2,
1381 }) = kind
1382 {
1383 let http1 = http1
1384 .as_ref()
1385 .expect("proxy protocol must have http1 field");
1386 let http2 = http2
1387 .as_ref()
1388 .expect("proxy protocol must have http2 field");
1389 f(http1.failure_accrual.as_ref());
1390 f(http2.failure_accrual.as_ref());
1391 } else {
1392 panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
1393 }
1394}
1395
1396#[track_caller]
1397fn failure_accrual_consecutive(
1398 accrual: Option<&grpc::outbound::FailureAccrual>,
1399) -> &grpc::outbound::failure_accrual::ConsecutiveFailures {
1400 assert!(
1401 accrual.is_some(),
1402 "failure accrual must be configured for service"
1403 );
1404 let kind = accrual
1405 .unwrap()
1406 .kind
1407 .as_ref()
1408 .expect("failure accrual must have kind");
1409 let grpc::outbound::failure_accrual::Kind::ConsecutiveFailures(accrual) = kind;
1410 accrual
1411}
1412
1413#[track_caller]
1414fn route_backends_first_available(
1415 route: &grpc::outbound::HttpRoute,
1416) -> &[grpc::outbound::http_route::RouteBackend] {
1417 let kind = assert_singleton(&route.rules)
1418 .backends
1419 .as_ref()
1420 .expect("Rule must have backends")
1421 .kind
1422 .as_ref()
1423 .expect("Backend must have kind");
1424 match kind {
1425 grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends,
1426 _ => panic!("Distribution must be FirstAvailable"),
1427 }
1428}
1429
1430#[track_caller]
1431fn route_backends_random_available(
1432 route: &grpc::outbound::HttpRoute,
1433) -> &[grpc::outbound::http_route::WeightedRouteBackend] {
1434 let kind = assert_singleton(&route.rules)
1435 .backends
1436 .as_ref()
1437 .expect("Rule must have backends")
1438 .kind
1439 .as_ref()
1440 .expect("Backend must have kind");
1441 match kind {
1442 grpc::outbound::http_route::distribution::Kind::RandomAvailable(dist) => &dist.backends,
1443 _ => panic!("Distribution must be RandomAvailable"),
1444 }
1445}
1446
1447#[track_caller]
1448fn route_name(route: &grpc::outbound::HttpRoute) -> &str {
1449 match route.metadata.as_ref().unwrap().kind.as_ref().unwrap() {
1450 grpc::meta::metadata::Kind::Resource(grpc::meta::Resource { ref name, .. }) => name,
1451 _ => panic!("route must be a resource kind"),
1452 }
1453}
1454
1455#[track_caller]
1456fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::WeightedRouteBackend) {
1457 let filter = assert_singleton(&backend.backend.as_ref().unwrap().filters);
1458 match filter.kind.as_ref().unwrap() {
1459 grpc::outbound::http_route::filter::Kind::FailureInjector(_) => {}
1460 _ => panic!("backend must have FailureInjector filter"),
1461 };
1462}
1463
1464#[track_caller]
1465fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) {
1466 let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
1467 match kind {
1468 grpc::meta::metadata::Kind::Default(_) => {}
1469 grpc::meta::metadata::Kind::Resource(r) => {
1470 panic!("route expected to be default but got resource {r:?}")
1471 }
1472 }
1473
1474 let backends = route_backends_first_available(route);
1475 let backend = assert_singleton(backends);
1476 assert_backend_matches_service(backend, svc, port);
1477
1478 let rule = assert_singleton(&route.rules);
1479 let route_match = assert_singleton(&rule.matches);
1480 let path_match = route_match.path.as_ref().unwrap().kind.as_ref().unwrap();
1481 assert_eq!(
1482 *path_match,
1483 grpc::http_route::path_match::Kind::Prefix("/".to_string())
1484 );
1485}
1486
1487#[track_caller]
1488fn assert_backend_matches_service(
1489 backend: &grpc::outbound::http_route::RouteBackend,
1490 svc: &k8s::Service,
1491 port: u16,
1492) {
1493 let backend = backend.backend.as_ref().unwrap();
1494 let dst = match backend.kind.as_ref().unwrap() {
1495 grpc::outbound::backend::Kind::Balancer(balance) => {
1496 let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap();
1497 match kind {
1498 grpc::outbound::backend::endpoint_discovery::Kind::Dst(dst) => &dst.path,
1499 }
1500 }
1501 grpc::outbound::backend::Kind::Forward(_) => {
1502 panic!("default route backend must be Balancer")
1503 }
1504 };
1505 assert_eq!(
1506 *dst,
1507 format!(
1508 "{}.{}.svc.{}:{}",
1509 svc.name_unchecked(),
1510 svc.namespace().unwrap(),
1511 "cluster.local",
1512 port
1513 )
1514 );
1515
1516 assert_svc_meta(&backend.metadata, svc, port)
1517}
1518
1519#[track_caller]
1520fn assert_singleton<T>(ts: &[T]) -> &T {
1521 assert_eq!(ts.len(), 1);
1522 ts.first().unwrap()
1523}
1524
1525#[track_caller]
1526fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) {
1527 let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
1528 match kind {
1529 grpc::meta::metadata::Kind::Default(d) => {
1530 panic!("route expected to not be default, but got default {d:?}")
1531 }
1532 grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name),
1533 }
1534}
View as plain text