...

Text file src/github.com/linkerd/linkerd2/policy-test/tests/outbound_api_gateway.rs

Documentation: github.com/linkerd/linkerd2/policy-test/tests

     1use futures::prelude::*;
     2use kube::ResourceExt;
     3use linkerd_policy_controller_k8s_api as k8s;
     4use linkerd_policy_test::{
     5    assert_default_accrual_backoff, assert_svc_meta, create, create_annotated_service,
     6    create_cluster_scoped, create_opaque_service, create_service, delete_cluster_scoped, grpc,
     7    mk_service, with_temp_ns,
     8};
     9use maplit::{btreemap, convert_args};
    10use std::{collections::BTreeMap, time::Duration};
    11use tokio::time;
    12
    13// These tests are copies of the tests in outbound_api_gateway.rs but using the
    14// policy.linkerd.io HttpRoute kubernetes types instead of the Gateway API ones.
    15// These two files should be kept in sync to ensure that Linkerd can read and
    16// function correctly with both types of resources.
    17
    18#[tokio::test(flavor = "current_thread")]
    19async fn service_does_not_exist() {
    20    with_temp_ns(|client, ns| async move {
    21        // Build a service but don't apply it to the cluster.
    22        let mut svc = mk_service(&ns, "my-svc", 4191);
    23        // Give it a bogus cluster ip.
    24        svc.spec.as_mut().unwrap().cluster_ip = Some("1.1.1.1".to_string());
    25
    26        let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(&client).await;
    27        let rsp = policy_api.watch(&ns, &svc, 4191).await;
    28
    29        assert!(rsp.is_err());
    30        assert_eq!(rsp.err().unwrap().code(), tonic::Code::NotFound);
    31    })
    32    .await;
    33}
    34
    35#[tokio::test(flavor = "current_thread")]
    36async fn service_with_no_http_routes() {
    37    with_temp_ns(|client, ns| async move {
    38        // Create a service
    39        let svc = create_service(&client, &ns, "my-svc", 4191).await;
    40
    41        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
    42        let config = rx
    43            .next()
    44            .await
    45            .expect("watch must not fail")
    46            .expect("watch must return an initial config");
    47        tracing::trace!(?config);
    48
    49        assert_svc_meta(&config.metadata, &svc, 4191);
    50
    51        // There should be a default route.
    52        detect_http_routes(&config, |routes| {
    53            let route = assert_singleton(routes);
    54            assert_route_is_default(route, &svc, 4191);
    55        });
    56    })
    57    .await;
    58}
    59
    60#[tokio::test(flavor = "current_thread")]
    61async fn service_with_http_route_without_rules() {
    62    with_temp_ns(|client, ns| async move {
    63        // Create a service
    64        let svc = create_service(&client, &ns, "my-svc", 4191).await;
    65
    66        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
    67        let config = rx
    68            .next()
    69            .await
    70            .expect("watch must not fail")
    71            .expect("watch must return an initial config");
    72        tracing::trace!(?config);
    73
    74        assert_svc_meta(&config.metadata, &svc, 4191);
    75
    76        // There should be a default route.
    77        detect_http_routes(&config, |routes| {
    78            let route = assert_singleton(routes);
    79            assert_route_is_default(route, &svc, 4191);
    80        });
    81
    82        let _route = create(&client, mk_empty_http_route(&ns, "foo-route", &svc, 4191)).await;
    83
    84        let config = rx
    85            .next()
    86            .await
    87            .expect("watch must not fail")
    88            .expect("watch must return an updated config");
    89        tracing::trace!(?config);
    90
    91        assert_svc_meta(&config.metadata, &svc, 4191);
    92
    93        // There should be a route with no rules.
    94        detect_http_routes(&config, |routes| {
    95            let route = assert_singleton(routes);
    96            assert_eq!(route.rules.len(), 0);
    97        });
    98    })
    99    .await;
   100}
   101
   102#[tokio::test(flavor = "current_thread")]
   103async fn service_with_http_routes_without_backends() {
   104    with_temp_ns(|client, ns| async move {
   105        // Create a service
   106        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   107
   108        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   109        let config = rx
   110            .next()
   111            .await
   112            .expect("watch must not fail")
   113            .expect("watch must return an initial config");
   114        tracing::trace!(?config);
   115
   116        assert_svc_meta(&config.metadata, &svc, 4191);
   117
   118        // There should be a default route.
   119        detect_http_routes(&config, |routes| {
   120            let route = assert_singleton(routes);
   121            assert_route_is_default(route, &svc, 4191);
   122        });
   123
   124        let _route = create(
   125            &client,
   126            mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
   127        )
   128        .await;
   129
   130        let config = rx
   131            .next()
   132            .await
   133            .expect("watch must not fail")
   134            .expect("watch must return an updated config");
   135        tracing::trace!(?config);
   136
   137        assert_svc_meta(&config.metadata, &svc, 4191);
   138
   139        // There should be a route with the logical backend.
   140        detect_http_routes(&config, |routes| {
   141            let route = assert_singleton(routes);
   142            let backends = route_backends_first_available(route);
   143            let backend = assert_singleton(backends);
   144            assert_backend_matches_service(backend, &svc, 4191);
   145        });
   146    })
   147    .await;
   148}
   149
   150#[tokio::test(flavor = "current_thread")]
   151async fn service_with_http_routes_with_backend() {
   152    with_temp_ns(|client, ns| async move {
   153        // Create a service
   154        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   155
   156        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   157        let config = rx
   158            .next()
   159            .await
   160            .expect("watch must not fail")
   161            .expect("watch must return an initial config");
   162        tracing::trace!(?config);
   163
   164        assert_svc_meta(&config.metadata, &svc, 4191);
   165
   166        // There should be a default route.
   167        detect_http_routes(&config, |routes| {
   168            let route = assert_singleton(routes);
   169            assert_route_is_default(route, &svc, 4191);
   170        });
   171
   172        let backend_name = "backend";
   173        let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
   174        let backends = [backend_name];
   175        let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
   176            Some(&backends),
   177            None,
   178            None,
   179        );
   180        let _route = create(&client, route.build()).await;
   181
   182        let config = rx
   183            .next()
   184            .await
   185            .expect("watch must not fail")
   186            .expect("watch must return an updated config");
   187        tracing::trace!(?config);
   188
   189        assert_svc_meta(&config.metadata, &svc, 4191);
   190
   191        // There should be a route with a backend with no filters.
   192        detect_http_routes(&config, |routes| {
   193            let route = assert_singleton(routes);
   194            let backends = route_backends_random_available(route);
   195            let backend = assert_singleton(backends);
   196            assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
   197            let filters = &backend.backend.as_ref().unwrap().filters;
   198            assert_eq!(filters.len(), 0);
   199        });
   200    })
   201    .await;
   202}
   203
   204#[tokio::test(flavor = "current_thread")]
   205async fn service_with_http_routes_with_cross_namespace_backend() {
   206    with_temp_ns(|client, ns| async move {
   207        // Create a service
   208        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   209
   210        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   211        let config = rx
   212            .next()
   213            .await
   214            .expect("watch must not fail")
   215            .expect("watch must return an initial config");
   216        tracing::trace!(?config);
   217
   218        assert_svc_meta(&config.metadata, &svc, 4191);
   219
   220        // There should be a default route.
   221        detect_http_routes(&config, |routes| {
   222            let route = assert_singleton(routes);
   223            assert_route_is_default(route, &svc, 4191);
   224        });
   225
   226        let backend_ns_name = format!("{}-backend", ns);
   227        let backend_ns = create_cluster_scoped(
   228            &client,
   229            k8s::Namespace {
   230                metadata: k8s::ObjectMeta {
   231                    name: Some(backend_ns_name.clone()),
   232                    labels: Some(convert_args!(btreemap!(
   233                        "linkerd-policy-test" => std::thread::current().name().unwrap_or(""),
   234                    ))),
   235                    ..Default::default()
   236                },
   237                ..Default::default()
   238            },
   239        )
   240        .await;
   241        let backend_name = "backend";
   242        let backend_svc = create_service(&client, &backend_ns_name, backend_name, 8888).await;
   243        let backends = [backend_name];
   244        let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
   245            Some(&backends),
   246            Some(backend_ns_name),
   247            None,
   248        );
   249        let _route = create(&client, route.build()).await;
   250
   251        let config = rx
   252            .next()
   253            .await
   254            .expect("watch must not fail")
   255            .expect("watch must return an updated config");
   256        tracing::trace!(?config);
   257
   258        assert_svc_meta(&config.metadata, &svc, 4191);
   259
   260        // There should be a route with a backend with no filters.
   261        detect_http_routes(&config, |routes| {
   262            let route = assert_singleton(routes);
   263            let backends = route_backends_random_available(route);
   264            let backend = assert_singleton(backends);
   265            assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
   266            let filters = &backend.backend.as_ref().unwrap().filters;
   267            assert_eq!(filters.len(), 0);
   268        });
   269
   270        delete_cluster_scoped(&client, backend_ns).await
   271    })
   272    .await;
   273}
   274
   275// TODO: Test fails until handling of invalid backends is implemented.
   276#[tokio::test(flavor = "current_thread")]
   277async fn service_with_http_routes_with_invalid_backend() {
   278    with_temp_ns(|client, ns| async move {
   279        // Create a service
   280        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   281
   282        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   283        let config = rx
   284            .next()
   285            .await
   286            .expect("watch must not fail")
   287            .expect("watch must return an initial config");
   288        tracing::trace!(?config);
   289
   290        assert_svc_meta(&config.metadata, &svc, 4191);
   291
   292        // There should be a default route.
   293        detect_http_routes(&config, |routes| {
   294            let route = assert_singleton(routes);
   295            assert_route_is_default(route, &svc, 4191);
   296        });
   297
   298        let backends = ["invalid-backend"];
   299        let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
   300            Some(&backends),
   301            None,
   302            None,
   303        );
   304        let _route = create(&client, route.build()).await;
   305
   306        let config = rx
   307            .next()
   308            .await
   309            .expect("watch must not fail")
   310            .expect("watch must return an updated config");
   311        tracing::trace!(?config);
   312
   313        assert_svc_meta(&config.metadata, &svc, 4191);
   314
   315        // There should be a route with a backend.
   316        detect_http_routes(&config, |routes| {
   317            let route = assert_singleton(routes);
   318            let backends = route_backends_random_available(route);
   319            let backend = assert_singleton(backends);
   320            assert_backend_has_failure_filter(backend);
   321        });
   322    })
   323    .await;
   324}
   325
   326// TODO: Investigate why the policy controller is only returning one route in this
   327// case instead of two.
   328#[tokio::test(flavor = "current_thread")]
   329async fn service_with_multiple_http_routes() {
   330    with_temp_ns(|client, ns| async move {
   331        // Create a service
   332        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   333
   334        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   335        let config = rx
   336            .next()
   337            .await
   338            .expect("watch must not fail")
   339            .expect("watch must return an initial config");
   340        tracing::trace!(?config);
   341
   342        assert_svc_meta(&config.metadata, &svc, 4191);
   343
   344        // There should be a default route.
   345        detect_http_routes(&config, |routes| {
   346            let route = assert_singleton(routes);
   347            assert_route_is_default(route, &svc, 4191);
   348        });
   349
   350        // Routes should be returned in sorted order by creation timestamp then
   351        // name. To ensure that this test isn't timing dependant, routes should
   352        // be created in alphabetical order.
   353        let _a_route = create(
   354            &client,
   355            mk_http_route(&ns, "a-route", &svc, Some(4191)).build(),
   356        )
   357        .await;
   358
   359        // First route update.
   360        let config = rx
   361            .next()
   362            .await
   363            .expect("watch must not fail")
   364            .expect("watch must return an updated config");
   365        tracing::trace!(?config);
   366
   367        assert_svc_meta(&config.metadata, &svc, 4191);
   368
   369        let _b_route = create(
   370            &client,
   371            mk_http_route(&ns, "b-route", &svc, Some(4191)).build(),
   372        )
   373        .await;
   374
   375        // Second route update.
   376        let config = rx
   377            .next()
   378            .await
   379            .expect("watch must not fail")
   380            .expect("watch must return an updated config");
   381        tracing::trace!(?config);
   382
   383        assert_svc_meta(&config.metadata, &svc, 4191);
   384
   385        // There should be 2 routes, returned in order.
   386        detect_http_routes(&config, |routes| {
   387            assert_eq!(routes.len(), 2);
   388            assert_eq!(route_name(&routes[0]), "a-route");
   389            assert_eq!(route_name(&routes[1]), "b-route");
   390        });
   391    })
   392    .await;
   393}
   394
   395#[tokio::test(flavor = "current_thread")]
   396async fn service_with_consecutive_failure_accrual() {
   397    with_temp_ns(|client, ns| async move {
   398        let svc = create_annotated_service(
   399            &client,
   400            &ns,
   401            "consecutive-accrual-svc",
   402            80,
   403            BTreeMap::from([
   404                (
   405                    "balancer.linkerd.io/failure-accrual".to_string(),
   406                    "consecutive".to_string(),
   407                ),
   408                (
   409                    "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
   410                    "8".to_string(),
   411                ),
   412                (
   413                    "balancer.linkerd.io/failure-accrual-consecutive-min-penalty".to_string(),
   414                    "10s".to_string(),
   415                ),
   416                (
   417                    "balancer.linkerd.io/failure-accrual-consecutive-max-penalty".to_string(),
   418                    "10m".to_string(),
   419                ),
   420                (
   421                    "balancer.linkerd.io/failure-accrual-consecutive-jitter-ratio".to_string(),
   422                    "1.0".to_string(),
   423                ),
   424            ]),
   425        )
   426        .await;
   427
   428        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   429        let config = rx
   430            .next()
   431            .await
   432            .expect("watch must not fail")
   433            .expect("watch must return an initial config");
   434        tracing::trace!(?config);
   435
   436        detect_failure_accrual(&config, |accrual| {
   437            let consecutive = failure_accrual_consecutive(accrual);
   438            assert_eq!(8, consecutive.max_failures);
   439            assert_eq!(
   440                &grpc::outbound::ExponentialBackoff {
   441                    min_backoff: Some(Duration::from_secs(10).try_into().unwrap()),
   442                    max_backoff: Some(Duration::from_secs(600).try_into().unwrap()),
   443                    jitter_ratio: 1.0_f32,
   444                },
   445                consecutive
   446                    .backoff
   447                    .as_ref()
   448                    .expect("backoff must be configured")
   449            );
   450        });
   451    })
   452    .await;
   453}
   454
   455#[tokio::test(flavor = "current_thread")]
   456async fn service_with_consecutive_failure_accrual_defaults() {
   457    with_temp_ns(|client, ns| async move {
   458        // Create a service configured to do consecutive failure accrual, but
   459        // with no additional configuration
   460        let svc = create_annotated_service(
   461            &client,
   462            &ns,
   463            "default-accrual-svc",
   464            80,
   465            BTreeMap::from([(
   466                "balancer.linkerd.io/failure-accrual".to_string(),
   467                "consecutive".to_string(),
   468            )]),
   469        )
   470        .await;
   471
   472        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   473        let config = rx
   474            .next()
   475            .await
   476            .expect("watch must not fail")
   477            .expect("watch must return an initial config");
   478        tracing::trace!(?config);
   479
   480        // Expect default max_failures and default backoff
   481        detect_failure_accrual(&config, |accrual| {
   482            let consecutive = failure_accrual_consecutive(accrual);
   483            assert_eq!(7, consecutive.max_failures);
   484            assert_default_accrual_backoff!(consecutive
   485                .backoff
   486                .as_ref()
   487                .expect("backoff must be configured"));
   488        });
   489
   490        // Create a service configured to do consecutive failure accrual with
   491        // max number of failures and with default backoff
   492        let svc = create_annotated_service(
   493            &client,
   494            &ns,
   495            "no-backoff-svc",
   496            80,
   497            BTreeMap::from([
   498                (
   499                    "balancer.linkerd.io/failure-accrual".to_string(),
   500                    "consecutive".to_string(),
   501                ),
   502                (
   503                    "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
   504                    "8".to_string(),
   505                ),
   506            ]),
   507        )
   508        .await;
   509
   510        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   511        let config = rx
   512            .next()
   513            .await
   514            .expect("watch must not fail")
   515            .expect("watch must return an initial config");
   516        tracing::trace!(?config);
   517
   518        // Expect default backoff and overridden max_failures
   519        detect_failure_accrual(&config, |accrual| {
   520            let consecutive = failure_accrual_consecutive(accrual);
   521            assert_eq!(8, consecutive.max_failures);
   522            assert_default_accrual_backoff!(consecutive
   523                .backoff
   524                .as_ref()
   525                .expect("backoff must be configured"));
   526        });
   527
   528        // Create a service configured to do consecutive failure accrual with
   529        // only the jitter ratio configured in the backoff
   530        let svc = create_annotated_service(
   531            &client,
   532            &ns,
   533            "only-jitter-svc",
   534            80,
   535            BTreeMap::from([
   536                (
   537                    "balancer.linkerd.io/failure-accrual".to_string(),
   538                    "consecutive".to_string(),
   539                ),
   540                (
   541                    "balancer.linkerd.io/failure-accrual-consecutive-jitter-ratio".to_string(),
   542                    "1.0".to_string(),
   543                ),
   544            ]),
   545        )
   546        .await;
   547
   548        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   549        let config = rx
   550            .next()
   551            .await
   552            .expect("watch must not fail")
   553            .expect("watch must return an initial config");
   554        tracing::trace!(?config);
   555
   556        // Expect defaults for everything except for the jitter ratio
   557        detect_failure_accrual(&config, |accrual| {
   558            let consecutive = failure_accrual_consecutive(accrual);
   559            assert_eq!(7, consecutive.max_failures);
   560            assert_eq!(
   561                &grpc::outbound::ExponentialBackoff {
   562                    min_backoff: Some(Duration::from_secs(1).try_into().unwrap()),
   563                    max_backoff: Some(Duration::from_secs(60).try_into().unwrap()),
   564                    jitter_ratio: 1.0_f32,
   565                },
   566                consecutive
   567                    .backoff
   568                    .as_ref()
   569                    .expect("backoff must be configured")
   570            );
   571        });
   572    })
   573    .await;
   574}
   575
   576#[tokio::test(flavor = "current_thread")]
   577async fn service_with_default_failure_accrual() {
   578    with_temp_ns(|client, ns| async move {
   579        // Default config for Service, no failure accrual
   580        let svc = create_service(&client, &ns, "default-failure-accrual", 80).await;
   581
   582        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   583        let config = rx
   584            .next()
   585            .await
   586            .expect("watch must not fail")
   587            .expect("watch must return an initial config");
   588        tracing::trace!(?config);
   589
   590        // Expect failure accrual config to be default (no failure accrual)
   591        detect_failure_accrual(&config, |accrual| {
   592            assert!(
   593                accrual.is_none(),
   594                "consecutive failure accrual should not be configured for service"
   595            );
   596        });
   597
   598        // Create Service with consecutive failure accrual config for
   599        // max_failures but no mode
   600        let svc = create_annotated_service(
   601            &client,
   602            &ns,
   603            "default-max-failure-svc",
   604            80,
   605            BTreeMap::from([(
   606                "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
   607                "8".to_string(),
   608            )]),
   609        )
   610        .await;
   611
   612        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   613        let config = rx
   614            .next()
   615            .await
   616            .expect("watch must not fail")
   617            .expect("watch must return an initial config");
   618        tracing::trace!(?config);
   619
   620        // Expect failure accrual config to be default (no failure accrual)
   621        detect_failure_accrual(&config, |accrual| {
   622            assert!(
   623                accrual.is_none(),
   624                "consecutive failure accrual should not be configured for service"
   625            )
   626        });
   627    })
   628    .await;
   629}
   630
   631#[tokio::test(flavor = "current_thread")]
   632async fn opaque_service() {
   633    with_temp_ns(|client, ns| async move {
   634        // Create a service
   635        let svc = create_opaque_service(&client, &ns, "my-svc", 4191).await;
   636
   637        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   638        let config = rx
   639            .next()
   640            .await
   641            .expect("watch must not fail")
   642            .expect("watch must return an initial config");
   643        tracing::trace!(?config);
   644
   645        // Proxy protocol should be opaque.
   646        match config.protocol.unwrap().kind.unwrap() {
   647            grpc::outbound::proxy_protocol::Kind::Opaque(_) => {}
   648            _ => panic!("proxy protocol must be Opaque"),
   649        };
   650    })
   651    .await;
   652}
   653
   654#[tokio::test(flavor = "current_thread")]
   655async fn route_with_filters() {
   656    with_temp_ns(|client, ns| async move {
   657        // Create a service
   658        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   659
   660        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   661        let config = rx
   662            .next()
   663            .await
   664            .expect("watch must not fail")
   665            .expect("watch must return an initial config");
   666        tracing::trace!(?config);
   667
   668        // There should be a default route.
   669        detect_http_routes(&config, |routes| {
   670            let route = assert_singleton(routes);
   671            assert_route_is_default(route, &svc, 4191);
   672        });
   673
   674        let backend_name = "backend";
   675        let backends = [backend_name];
   676        let route = mk_http_route(&ns, "foo-route", &svc, Some(4191))
   677            .with_backends(Some(&backends), None, None)
   678            .with_filters(Some(vec![
   679                k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier {
   680                    request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter {
   681                        set: Some(vec![k8s_gateway_api::HttpHeader {
   682                            name: "set".to_string(),
   683                            value: "set-value".to_string(),
   684                        }]),
   685                        add: Some(vec![k8s_gateway_api::HttpHeader {
   686                            name: "add".to_string(),
   687                            value: "add-value".to_string(),
   688                        }]),
   689                        remove: Some(vec!["remove".to_string()]),
   690                    },
   691                },
   692                k8s_gateway_api::HttpRouteFilter::RequestRedirect {
   693                    request_redirect: k8s_gateway_api::HttpRequestRedirectFilter {
   694                        scheme: Some("http".to_string()),
   695                        hostname: Some("host".to_string()),
   696                        path: Some(k8s_gateway_api::HttpPathModifier::ReplacePrefixMatch {
   697                            replace_prefix_match: "/path".to_string(),
   698                        }),
   699                        port: Some(5555),
   700                        status_code: Some(302),
   701                    },
   702                },
   703            ]));
   704        let _route = create(&client, route.build()).await;
   705
   706        let config = rx
   707            .next()
   708            .await
   709            .expect("watch must not fail")
   710            .expect("watch must return an updated config");
   711        tracing::trace!(?config);
   712
   713        // There should be a route with filters.
   714        detect_http_routes(&config, |routes| {
   715            let route = assert_singleton(routes);
   716            let rule = assert_singleton(&route.rules);
   717            let filters = &rule.filters;
   718            assert_eq!(
   719                *filters,
   720                vec![
   721                    grpc::outbound::http_route::Filter {
   722                        kind: Some(
   723                            grpc::outbound::http_route::filter::Kind::RequestHeaderModifier(
   724                                grpc::http_route::RequestHeaderModifier {
   725                                    add: Some(grpc::http_types::Headers {
   726                                        headers: vec![grpc::http_types::headers::Header {
   727                                            name: "add".to_string(),
   728                                            value: "add-value".into(),
   729                                        }]
   730                                    }),
   731                                    set: Some(grpc::http_types::Headers {
   732                                        headers: vec![grpc::http_types::headers::Header {
   733                                            name: "set".to_string(),
   734                                            value: "set-value".into(),
   735                                        }]
   736                                    }),
   737                                    remove: vec!["remove".to_string()],
   738                                }
   739                            )
   740                        )
   741                    },
   742                    grpc::outbound::http_route::Filter {
   743                        kind: Some(grpc::outbound::http_route::filter::Kind::Redirect(
   744                            grpc::http_route::RequestRedirect {
   745                                scheme: Some(grpc::http_types::Scheme {
   746                                    r#type: Some(grpc::http_types::scheme::Type::Registered(
   747                                        grpc::http_types::scheme::Registered::Http.into(),
   748                                    ))
   749                                }),
   750                                host: "host".to_string(),
   751                                path: Some(linkerd2_proxy_api::http_route::PathModifier { replace: Some(linkerd2_proxy_api::http_route::path_modifier::Replace::Prefix("/path".to_string())) }),
   752                                port: 5555,
   753                                status: 302,
   754                            }
   755                        ))
   756                    }
   757                ]
   758            );
   759        });
   760    })
   761    .await;
   762}
   763
   764#[tokio::test(flavor = "current_thread")]
   765async fn backend_with_filters() {
   766    with_temp_ns(|client, ns| async move {
   767        // Create a service
   768        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   769
   770        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   771        let config = rx
   772            .next()
   773            .await
   774            .expect("watch must not fail")
   775            .expect("watch must return an initial config");
   776        tracing::trace!(?config);
   777
   778        // There should be a default route.
   779        detect_http_routes(&config, |routes| {
   780            let route = assert_singleton(routes);
   781            assert_route_is_default(route, &svc, 4191);
   782        });
   783
   784        let backend_name = "backend";
   785        let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
   786        let backends = [backend_name];
   787        let route = mk_http_route(&ns, "foo-route", &svc, Some(4191))
   788            .with_backends(Some(&backends), None, Some(vec![
   789                k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier {
   790                    request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter {
   791                        set: Some(vec![k8s_gateway_api::HttpHeader {
   792                            name: "set".to_string(),
   793                            value: "set-value".to_string(),
   794                        }]),
   795                        add: Some(vec![k8s_gateway_api::HttpHeader {
   796                            name: "add".to_string(),
   797                            value: "add-value".to_string(),
   798                        }]),
   799                        remove: Some(vec!["remove".to_string()]),
   800                    },
   801                },
   802                k8s_gateway_api::HttpRouteFilter::RequestRedirect {
   803                    request_redirect: k8s_gateway_api::HttpRequestRedirectFilter {
   804                        scheme: Some("http".to_string()),
   805                        hostname: Some("host".to_string()),
   806                        path: Some(k8s_gateway_api::HttpPathModifier::ReplacePrefixMatch {
   807                            replace_prefix_match: "/path".to_string(),
   808                        }),
   809                        port: Some(5555),
   810                        status_code: Some(302),
   811                    },
   812                },
   813            ]));
   814        let _route = create(&client, route.build()).await;
   815
   816        let config = rx
   817            .next()
   818            .await
   819            .expect("watch must not fail")
   820            .expect("watch must return an updated config");
   821        tracing::trace!(?config);
   822
   823        // There should be a route without rule filters.
   824        detect_http_routes(&config, |routes| {
   825            let route = assert_singleton(routes);
   826            let rule = assert_singleton(&route.rules);
   827            assert_eq!(rule.filters.len(), 0);
   828            let backends = route_backends_random_available(route);
   829            let backend = assert_singleton(backends);
   830            assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
   831            let filters = &backend.backend.as_ref().unwrap().filters;
   832            assert_eq!(
   833                *filters,
   834                vec![
   835                    grpc::outbound::http_route::Filter {
   836                        kind: Some(
   837                            grpc::outbound::http_route::filter::Kind::RequestHeaderModifier(
   838                                grpc::http_route::RequestHeaderModifier {
   839                                    add: Some(grpc::http_types::Headers {
   840                                        headers: vec![grpc::http_types::headers::Header {
   841                                            name: "add".to_string(),
   842                                            value: "add-value".into(),
   843                                        }]
   844                                    }),
   845                                    set: Some(grpc::http_types::Headers {
   846                                        headers: vec![grpc::http_types::headers::Header {
   847                                            name: "set".to_string(),
   848                                            value: "set-value".into(),
   849                                        }]
   850                                    }),
   851                                    remove: vec!["remove".to_string()],
   852                                }
   853                            )
   854                        )
   855                    },
   856                    grpc::outbound::http_route::Filter {
   857                        kind: Some(grpc::outbound::http_route::filter::Kind::Redirect(
   858                            grpc::http_route::RequestRedirect {
   859                                scheme: Some(grpc::http_types::Scheme {
   860                                    r#type: Some(grpc::http_types::scheme::Type::Registered(
   861                                        grpc::http_types::scheme::Registered::Http.into(),
   862                                    ))
   863                                }),
   864                                host: "host".to_string(),
   865                                path: Some(linkerd2_proxy_api::http_route::PathModifier { replace: Some(linkerd2_proxy_api::http_route::path_modifier::Replace::Prefix("/path".to_string())) }),
   866                                port: 5555,
   867                                status: 302,
   868                            }
   869                        ))
   870                    }
   871                ]
   872            );
   873        });
   874    })
   875    .await;
   876}
   877
   878#[tokio::test(flavor = "current_thread")]
   879async fn http_route_with_no_port() {
   880    with_temp_ns(|client, ns| async move {
   881        // Create a service
   882        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   883
   884        let mut rx_4191 = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   885        let config_4191 = rx_4191
   886            .next()
   887            .await
   888            .expect("watch must not fail")
   889            .expect("watch must return an initial config");
   890        tracing::trace!(?config_4191);
   891
   892        let mut rx_9999 = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
   893        let config_9999 = rx_9999
   894            .next()
   895            .await
   896            .expect("watch must not fail")
   897            .expect("watch must return an initial config");
   898        tracing::trace!(?config_9999);
   899
   900        // There should be a default route.
   901        detect_http_routes(&config_4191, |routes| {
   902            let route = assert_singleton(routes);
   903            assert_route_is_default(route, &svc, 4191);
   904        });
   905        detect_http_routes(&config_9999, |routes| {
   906            let route = assert_singleton(routes);
   907            assert_route_is_default(route, &svc, 9999);
   908        });
   909
   910        let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await;
   911
   912        let config_4191 = rx_4191
   913            .next()
   914            .await
   915            .expect("watch must not fail")
   916            .expect("watch must return an updated config");
   917        tracing::trace!(?config_4191);
   918
   919        // The route should apply to the service.
   920        detect_http_routes(&config_4191, |routes| {
   921            let route = assert_singleton(routes);
   922            assert_route_name_eq(route, "foo-route");
   923        });
   924
   925        let config_9999 = rx_9999
   926            .next()
   927            .await
   928            .expect("watch must not fail")
   929            .expect("watch must return an updated config");
   930        tracing::trace!(?config_9999);
   931
   932        // The route should apply to other ports too.
   933        detect_http_routes(&config_9999, |routes| {
   934            let route = assert_singleton(routes);
   935            assert_route_name_eq(route, "foo-route");
   936        });
   937    })
   938    .await;
   939}
   940
   941#[tokio::test(flavor = "current_thread")]
   942async fn producer_route() {
   943    with_temp_ns(|client, ns| async move {
   944        // Create a service
   945        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   946
   947        let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   948        let producer_config = producer_rx
   949            .next()
   950            .await
   951            .expect("watch must not fail")
   952            .expect("watch must return an initial config");
   953        tracing::trace!(?producer_config);
   954
   955        let mut consumer_rx = retry_watch_outbound_policy(&client, "consumer_ns", &svc, 4191).await;
   956        let consumer_config = consumer_rx
   957            .next()
   958            .await
   959            .expect("watch must not fail")
   960            .expect("watch must return an initial config");
   961        tracing::trace!(?consumer_config);
   962
   963        // There should be a default route.
   964        detect_http_routes(&producer_config, |routes| {
   965            let route = assert_singleton(routes);
   966            assert_route_is_default(route, &svc, 4191);
   967        });
   968        detect_http_routes(&consumer_config, |routes| {
   969            let route = assert_singleton(routes);
   970            assert_route_is_default(route, &svc, 4191);
   971        });
   972
   973        // A route created in the same namespace as its parent service is called
   974        // a producer route. It should be returned in outbound policy requests
   975        // for that service from ALL namespaces.
   976        let _route = create(
   977            &client,
   978            mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
   979        )
   980        .await;
   981
   982        let producer_config = producer_rx
   983            .next()
   984            .await
   985            .expect("watch must not fail")
   986            .expect("watch must return an updated config");
   987        tracing::trace!(?producer_config);
   988        let consumer_config = consumer_rx
   989            .next()
   990            .await
   991            .expect("watch must not fail")
   992            .expect("watch must return an initial config");
   993        tracing::trace!(?consumer_config);
   994
   995        // The route should be returned in queries from the producer namespace.
   996        detect_http_routes(&producer_config, |routes| {
   997            let route = assert_singleton(routes);
   998            assert_route_name_eq(route, "foo-route");
   999        });
  1000
  1001        // The route should be returned in queries from a consumer namespace.
  1002        detect_http_routes(&consumer_config, |routes| {
  1003            let route = assert_singleton(routes);
  1004            assert_route_name_eq(route, "foo-route");
  1005        });
  1006    })
  1007    .await;
  1008}
  1009
  1010#[tokio::test(flavor = "current_thread")]
  1011async fn pre_existing_producer_route() {
  1012    // We test the scenario where outbound policy watches are initiated after
  1013    // a produce route already exists.
  1014    with_temp_ns(|client, ns| async move {
  1015        // Create a service
  1016        let svc = create_service(&client, &ns, "my-svc", 4191).await;
  1017
  1018        // A route created in the same namespace as its parent service is called
  1019        // a producer route. It should be returned in outbound policy requests
  1020        // for that service from ALL namespaces.
  1021        let _route = create(
  1022            &client,
  1023            mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
  1024        )
  1025        .await;
  1026
  1027        let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
  1028        let producer_config = producer_rx
  1029            .next()
  1030            .await
  1031            .expect("watch must not fail")
  1032            .expect("watch must return an initial config");
  1033        tracing::trace!(?producer_config);
  1034
  1035        let mut consumer_rx = retry_watch_outbound_policy(&client, "consumer_ns", &svc, 4191).await;
  1036        let consumer_config = consumer_rx
  1037            .next()
  1038            .await
  1039            .expect("watch must not fail")
  1040            .expect("watch must return an initial config");
  1041        tracing::trace!(?consumer_config);
  1042
  1043        // The route should be returned in queries from the producer namespace.
  1044        detect_http_routes(&producer_config, |routes| {
  1045            let route = assert_singleton(routes);
  1046            assert_route_name_eq(route, "foo-route");
  1047        });
  1048
  1049        // The route should be returned in queries from a consumer namespace.
  1050        detect_http_routes(&consumer_config, |routes| {
  1051            let route = assert_singleton(routes);
  1052            assert_route_name_eq(route, "foo-route");
  1053        });
  1054    })
  1055    .await;
  1056}
  1057
  1058#[tokio::test(flavor = "current_thread")]
  1059async fn consumer_route() {
  1060    with_temp_ns(|client, ns| async move {
  1061        // Create a service
  1062        let svc = create_service(&client, &ns, "my-svc", 4191).await;
  1063
  1064        let consumer_ns_name = format!("{}-consumer", ns);
  1065        let consumer_ns = create_cluster_scoped(
  1066            &client,
  1067            k8s::Namespace {
  1068                metadata: k8s::ObjectMeta {
  1069                    name: Some(consumer_ns_name.clone()),
  1070                    labels: Some(convert_args!(btreemap!(
  1071                        "linkerd-policy-test" => std::thread::current().name().unwrap_or(""),
  1072                    ))),
  1073                    ..Default::default()
  1074                },
  1075                ..Default::default()
  1076            },
  1077        )
  1078        .await;
  1079
  1080        let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
  1081        let producer_config = producer_rx
  1082            .next()
  1083            .await
  1084            .expect("watch must not fail")
  1085            .expect("watch must return an initial config");
  1086        tracing::trace!(?producer_config);
  1087
  1088        let mut consumer_rx =
  1089            retry_watch_outbound_policy(&client, &consumer_ns_name, &svc, 4191).await;
  1090        let consumer_config = consumer_rx
  1091            .next()
  1092            .await
  1093            .expect("watch must not fail")
  1094            .expect("watch must return an initial config");
  1095        tracing::trace!(?consumer_config);
  1096
  1097        let mut other_rx = retry_watch_outbound_policy(&client, "other_ns", &svc, 4191).await;
  1098        let other_config = other_rx
  1099            .next()
  1100            .await
  1101            .expect("watch must not fail")
  1102            .expect("watch must return an initial config");
  1103        tracing::trace!(?other_config);
  1104
  1105        // There should be a default route.
  1106        detect_http_routes(&producer_config, |routes| {
  1107            let route = assert_singleton(routes);
  1108            assert_route_is_default(route, &svc, 4191);
  1109        });
  1110        detect_http_routes(&consumer_config, |routes| {
  1111            let route = assert_singleton(routes);
  1112            assert_route_is_default(route, &svc, 4191);
  1113        });
  1114        detect_http_routes(&other_config, |routes| {
  1115            let route = assert_singleton(routes);
  1116            assert_route_is_default(route, &svc, 4191);
  1117        });
  1118
  1119        // A route created in a different namespace as its parent service is
  1120        // called a consumer route. It should be returned in outbound policy
  1121        // requests for that service ONLY when the request comes from the
  1122        // consumer namespace.
  1123        let _route = create(
  1124            &client,
  1125            mk_http_route(&consumer_ns_name, "foo-route", &svc, Some(4191)).build(),
  1126        )
  1127        .await;
  1128
  1129        // The route should NOT be returned in queries from the producer namespace.
  1130        // There should be a default route.
  1131        assert!(producer_rx.next().now_or_never().is_none());
  1132
  1133        // The route should be returned in queries from the same consumer
  1134        // namespace.
  1135        let consumer_config = consumer_rx
  1136            .next()
  1137            .await
  1138            .expect("watch must not fail")
  1139            .expect("watch must return an initial config");
  1140        tracing::trace!(?consumer_config);
  1141
  1142        detect_http_routes(&consumer_config, |routes| {
  1143            let route = assert_singleton(routes);
  1144            assert_route_name_eq(route, "foo-route");
  1145        });
  1146
  1147        // The route should NOT be returned in queries from a different consumer
  1148        // namespace.
  1149        assert!(other_rx.next().now_or_never().is_none());
  1150
  1151        delete_cluster_scoped(&client, consumer_ns).await;
  1152    })
  1153    .await;
  1154}
  1155
  1156/* Helpers */
  1157
  1158struct HttpRouteBuilder(k8s_gateway_api::HttpRoute);
  1159
  1160async fn retry_watch_outbound_policy(
  1161    client: &kube::Client,
  1162    ns: &str,
  1163    svc: &k8s::Service,
  1164    port: u16,
  1165) -> tonic::Streaming<grpc::outbound::OutboundPolicy> {
  1166    // Port-forward to the control plane and start watching the service's
  1167    // outbound policy.
  1168    let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await;
  1169    loop {
  1170        match policy_api.watch(ns, svc, port).await {
  1171            Ok(rx) => return rx,
  1172            Err(error) => {
  1173                tracing::error!(
  1174                    ?error,
  1175                    ns,
  1176                    svc = svc.name_unchecked(),
  1177                    "failed to watch outbound policy for port 4191"
  1178                );
  1179                time::sleep(Duration::from_secs(1)).await;
  1180            }
  1181        }
  1182    }
  1183}
  1184
  1185fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option<u16>) -> HttpRouteBuilder {
  1186    use k8s_gateway_api as api;
  1187
  1188    HttpRouteBuilder(api::HttpRoute {
  1189        metadata: kube::api::ObjectMeta {
  1190            namespace: Some(ns.to_string()),
  1191            name: Some(name.to_string()),
  1192            ..Default::default()
  1193        },
  1194        spec: api::HttpRouteSpec {
  1195            inner: api::CommonRouteSpec {
  1196                parent_refs: Some(vec![api::ParentReference {
  1197                    group: Some("core".to_string()),
  1198                    kind: Some("Service".to_string()),
  1199                    namespace: svc.namespace(),
  1200                    name: svc.name_unchecked(),
  1201                    section_name: None,
  1202                    port,
  1203                }]),
  1204            },
  1205            hostnames: None,
  1206            rules: Some(vec![api::HttpRouteRule {
  1207                matches: Some(vec![api::HttpRouteMatch {
  1208                    path: Some(api::HttpPathMatch::Exact {
  1209                        value: "/foo".to_string(),
  1210                    }),
  1211                    headers: None,
  1212                    query_params: None,
  1213                    method: Some("GET".to_string()),
  1214                }]),
  1215                filters: None,
  1216                backend_refs: None,
  1217            }]),
  1218        },
  1219        status: None,
  1220    })
  1221}
  1222
  1223impl HttpRouteBuilder {
  1224    fn with_backends(
  1225        self,
  1226        backends: Option<&[&str]>,
  1227        backends_ns: Option<String>,
  1228        backend_filters: Option<Vec<k8s_gateway_api::HttpRouteFilter>>,
  1229    ) -> Self {
  1230        let mut route = self.0;
  1231        let backend_refs = backends.map(|names| {
  1232            names
  1233                .iter()
  1234                .map(|name| k8s_gateway_api::HttpBackendRef {
  1235                    backend_ref: Some(k8s_gateway_api::BackendRef {
  1236                        weight: None,
  1237                        inner: k8s_gateway_api::BackendObjectReference {
  1238                            name: name.to_string(),
  1239                            port: Some(8888),
  1240                            group: None,
  1241                            kind: None,
  1242                            namespace: backends_ns.clone(),
  1243                        },
  1244                    }),
  1245                    filters: backend_filters.clone(),
  1246                })
  1247                .collect()
  1248        });
  1249        route.spec.rules.iter_mut().flatten().for_each(|rule| {
  1250            rule.backend_refs = backend_refs.clone();
  1251        });
  1252        Self(route)
  1253    }
  1254
  1255    fn with_filters(self, filters: Option<Vec<k8s_gateway_api::HttpRouteFilter>>) -> Self {
  1256        let mut route = self.0;
  1257        route
  1258            .spec
  1259            .rules
  1260            .iter_mut()
  1261            .flatten()
  1262            .for_each(|rule| rule.filters = filters.clone());
  1263        Self(route)
  1264    }
  1265
  1266    fn build(self) -> k8s_gateway_api::HttpRoute {
  1267        self.0
  1268    }
  1269}
  1270
  1271fn mk_empty_http_route(
  1272    ns: &str,
  1273    name: &str,
  1274    svc: &k8s::Service,
  1275    port: u16,
  1276) -> k8s_gateway_api::HttpRoute {
  1277    use k8s_gateway_api as api;
  1278    api::HttpRoute {
  1279        metadata: kube::api::ObjectMeta {
  1280            namespace: Some(ns.to_string()),
  1281            name: Some(name.to_string()),
  1282            ..Default::default()
  1283        },
  1284        spec: api::HttpRouteSpec {
  1285            inner: api::CommonRouteSpec {
  1286                parent_refs: Some(vec![api::ParentReference {
  1287                    group: Some("core".to_string()),
  1288                    kind: Some("Service".to_string()),
  1289                    namespace: svc.namespace(),
  1290                    name: svc.name_unchecked(),
  1291                    section_name: None,
  1292                    port: Some(port),
  1293                }]),
  1294            },
  1295            hostnames: None,
  1296            rules: Some(vec![]),
  1297        },
  1298        status: None,
  1299    }
  1300}
  1301
  1302// detect_http_routes asserts that the given outbound policy has a proxy protcol
  1303// of "Detect" and then invokes the given function with the Http1 and Http2
  1304// routes from the Detect.
  1305#[track_caller]
  1306fn detect_http_routes<F>(config: &grpc::outbound::OutboundPolicy, f: F)
  1307where
  1308    F: Fn(&[grpc::outbound::HttpRoute]),
  1309{
  1310    let kind = config
  1311        .protocol
  1312        .as_ref()
  1313        .expect("must have proxy protocol")
  1314        .kind
  1315        .as_ref()
  1316        .expect("must have kind");
  1317    if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
  1318        opaque: _,
  1319        timeout: _,
  1320        http1,
  1321        http2,
  1322    }) = kind
  1323    {
  1324        let http1 = http1
  1325            .as_ref()
  1326            .expect("proxy protocol must have http1 field");
  1327        let http2 = http2
  1328            .as_ref()
  1329            .expect("proxy protocol must have http2 field");
  1330        f(&http1.routes);
  1331        f(&http2.routes);
  1332    } else {
  1333        panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
  1334    }
  1335}
  1336
  1337#[track_caller]
  1338fn detect_failure_accrual<F>(config: &grpc::outbound::OutboundPolicy, f: F)
  1339where
  1340    F: Fn(Option<&grpc::outbound::FailureAccrual>),
  1341{
  1342    let kind = config
  1343        .protocol
  1344        .as_ref()
  1345        .expect("must have proxy protocol")
  1346        .kind
  1347        .as_ref()
  1348        .expect("must have kind");
  1349    if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
  1350        opaque: _,
  1351        timeout: _,
  1352        http1,
  1353        http2,
  1354    }) = kind
  1355    {
  1356        let http1 = http1
  1357            .as_ref()
  1358            .expect("proxy protocol must have http1 field");
  1359        let http2 = http2
  1360            .as_ref()
  1361            .expect("proxy protocol must have http2 field");
  1362        f(http1.failure_accrual.as_ref());
  1363        f(http2.failure_accrual.as_ref());
  1364    } else {
  1365        panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
  1366    }
  1367}
  1368
  1369#[track_caller]
  1370fn failure_accrual_consecutive(
  1371    accrual: Option<&grpc::outbound::FailureAccrual>,
  1372) -> &grpc::outbound::failure_accrual::ConsecutiveFailures {
  1373    assert!(
  1374        accrual.is_some(),
  1375        "failure accrual must be configured for service"
  1376    );
  1377    let kind = accrual
  1378        .unwrap()
  1379        .kind
  1380        .as_ref()
  1381        .expect("failure accrual must have kind");
  1382    let grpc::outbound::failure_accrual::Kind::ConsecutiveFailures(accrual) = kind;
  1383    accrual
  1384}
  1385
  1386#[track_caller]
  1387fn route_backends_first_available(
  1388    route: &grpc::outbound::HttpRoute,
  1389) -> &[grpc::outbound::http_route::RouteBackend] {
  1390    let kind = assert_singleton(&route.rules)
  1391        .backends
  1392        .as_ref()
  1393        .expect("Rule must have backends")
  1394        .kind
  1395        .as_ref()
  1396        .expect("Backend must have kind");
  1397    match kind {
  1398        grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends,
  1399        _ => panic!("Distribution must be FirstAvailable"),
  1400    }
  1401}
  1402
  1403#[track_caller]
  1404fn route_backends_random_available(
  1405    route: &grpc::outbound::HttpRoute,
  1406) -> &[grpc::outbound::http_route::WeightedRouteBackend] {
  1407    let kind = assert_singleton(&route.rules)
  1408        .backends
  1409        .as_ref()
  1410        .expect("Rule must have backends")
  1411        .kind
  1412        .as_ref()
  1413        .expect("Backend must have kind");
  1414    match kind {
  1415        grpc::outbound::http_route::distribution::Kind::RandomAvailable(dist) => &dist.backends,
  1416        _ => panic!("Distribution must be RandomAvailable"),
  1417    }
  1418}
  1419
  1420#[track_caller]
  1421fn route_name(route: &grpc::outbound::HttpRoute) -> &str {
  1422    match route.metadata.as_ref().unwrap().kind.as_ref().unwrap() {
  1423        grpc::meta::metadata::Kind::Resource(grpc::meta::Resource { ref name, .. }) => name,
  1424        _ => panic!("route must be a resource kind"),
  1425    }
  1426}
  1427
  1428#[track_caller]
  1429fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::WeightedRouteBackend) {
  1430    let filter = assert_singleton(&backend.backend.as_ref().unwrap().filters);
  1431    match filter.kind.as_ref().unwrap() {
  1432        grpc::outbound::http_route::filter::Kind::FailureInjector(_) => {}
  1433        _ => panic!("backend must have FailureInjector filter"),
  1434    };
  1435}
  1436
  1437#[track_caller]
  1438fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) {
  1439    let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
  1440    match kind {
  1441        grpc::meta::metadata::Kind::Default(_) => {}
  1442        grpc::meta::metadata::Kind::Resource(r) => {
  1443            panic!("route expected to be default but got resource {r:?}")
  1444        }
  1445    }
  1446
  1447    let backends = route_backends_first_available(route);
  1448    let backend = assert_singleton(backends);
  1449    assert_backend_matches_service(backend, svc, port);
  1450
  1451    let rule = assert_singleton(&route.rules);
  1452    let route_match = assert_singleton(&rule.matches);
  1453    let path_match = route_match.path.as_ref().unwrap().kind.as_ref().unwrap();
  1454    assert_eq!(
  1455        *path_match,
  1456        grpc::http_route::path_match::Kind::Prefix("/".to_string())
  1457    );
  1458}
  1459
  1460#[track_caller]
  1461fn assert_backend_matches_service(
  1462    backend: &grpc::outbound::http_route::RouteBackend,
  1463    svc: &k8s::Service,
  1464    port: u16,
  1465) {
  1466    let backend = backend.backend.as_ref().unwrap();
  1467    let dst = match backend.kind.as_ref().unwrap() {
  1468        grpc::outbound::backend::Kind::Balancer(balance) => {
  1469            let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap();
  1470            match kind {
  1471                grpc::outbound::backend::endpoint_discovery::Kind::Dst(dst) => &dst.path,
  1472            }
  1473        }
  1474        grpc::outbound::backend::Kind::Forward(_) => {
  1475            panic!("default route backend must be Balancer")
  1476        }
  1477    };
  1478    assert_eq!(
  1479        *dst,
  1480        format!(
  1481            "{}.{}.svc.{}:{}",
  1482            svc.name_unchecked(),
  1483            svc.namespace().unwrap(),
  1484            "cluster.local",
  1485            port
  1486        )
  1487    );
  1488
  1489    assert_svc_meta(&backend.metadata, svc, port)
  1490}
  1491
  1492#[track_caller]
  1493fn assert_singleton<T>(ts: &[T]) -> &T {
  1494    assert_eq!(ts.len(), 1);
  1495    ts.first().unwrap()
  1496}
  1497
  1498#[track_caller]
  1499fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) {
  1500    let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
  1501    match kind {
  1502        grpc::meta::metadata::Kind::Default(d) => {
  1503            panic!("route expected to not be default, but got default {d:?}")
  1504        }
  1505        grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name),
  1506    }
  1507}

View as plain text