...

Text file src/github.com/linkerd/linkerd2/policy-test/tests/outbound_api_linkerd.rs

Documentation: github.com/linkerd/linkerd2/policy-test/tests

     1use std::{collections::BTreeMap, time::Duration};
     2
     3use futures::prelude::*;
     4use kube::ResourceExt;
     5use linkerd_policy_controller_k8s_api as k8s;
     6use linkerd_policy_test::{
     7    assert_default_accrual_backoff, assert_svc_meta, create, create_annotated_service,
     8    create_cluster_scoped, create_opaque_service, create_service, delete_cluster_scoped, grpc,
     9    mk_service, with_temp_ns,
    10};
    11use maplit::{btreemap, convert_args};
    12use tokio::time;
    13
    14// These tests are copies of the tests in outbound_api_gateway.rs but using the
    15// policy.linkerd.io HttpRoute kubernetes types instead of the Gateway API ones.
    16// These two files should be kept in sync to ensure that Linkerd can read and
    17// function correctly with both types of resources.
    18
    19#[tokio::test(flavor = "current_thread")]
    20async fn service_does_not_exist() {
    21    with_temp_ns(|client, ns| async move {
    22        // Build a service but don't apply it to the cluster.
    23        let mut svc = mk_service(&ns, "my-svc", 4191);
    24        // Give it a bogus cluster ip.
    25        svc.spec.as_mut().unwrap().cluster_ip = Some("1.1.1.1".to_string());
    26
    27        let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(&client).await;
    28        let rsp = policy_api.watch(&ns, &svc, 4191).await;
    29
    30        assert!(rsp.is_err());
    31        assert_eq!(rsp.err().unwrap().code(), tonic::Code::NotFound);
    32    })
    33    .await;
    34}
    35
    36#[tokio::test(flavor = "current_thread")]
    37async fn service_with_no_http_routes() {
    38    with_temp_ns(|client, ns| async move {
    39        // Create a service
    40        let svc = create_service(&client, &ns, "my-svc", 4191).await;
    41
    42        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
    43        let config = rx
    44            .next()
    45            .await
    46            .expect("watch must not fail")
    47            .expect("watch must return an initial config");
    48        tracing::trace!(?config);
    49
    50        assert_svc_meta(&config.metadata, &svc, 4191);
    51
    52        // There should be a default route.
    53        detect_http_routes(&config, |routes| {
    54            let route = assert_singleton(routes);
    55            assert_route_is_default(route, &svc, 4191);
    56        });
    57    })
    58    .await;
    59}
    60
    61#[tokio::test(flavor = "current_thread")]
    62async fn service_with_http_route_without_rules() {
    63    with_temp_ns(|client, ns| async move {
    64        // Create a service
    65        let svc = create_service(&client, &ns, "my-svc", 4191).await;
    66
    67        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
    68        let config = rx
    69            .next()
    70            .await
    71            .expect("watch must not fail")
    72            .expect("watch must return an initial config");
    73        tracing::trace!(?config);
    74
    75        assert_svc_meta(&config.metadata, &svc, 4191);
    76
    77        // There should be a default route.
    78        detect_http_routes(&config, |routes| {
    79            let route = assert_singleton(routes);
    80            assert_route_is_default(route, &svc, 4191);
    81        });
    82
    83        let _route = create(&client, mk_empty_http_route(&ns, "foo-route", &svc, 4191)).await;
    84
    85        let config = rx
    86            .next()
    87            .await
    88            .expect("watch must not fail")
    89            .expect("watch must return an updated config");
    90        tracing::trace!(?config);
    91
    92        assert_svc_meta(&config.metadata, &svc, 4191);
    93
    94        // There should be a route with no rules.
    95        detect_http_routes(&config, |routes| {
    96            let route = assert_singleton(routes);
    97            assert_eq!(route.rules.len(), 0);
    98        });
    99    })
   100    .await;
   101}
   102
   103#[tokio::test(flavor = "current_thread")]
   104async fn service_with_http_routes_without_backends() {
   105    with_temp_ns(|client, ns| async move {
   106        // Create a service
   107        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   108
   109        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   110        let config = rx
   111            .next()
   112            .await
   113            .expect("watch must not fail")
   114            .expect("watch must return an initial config");
   115        tracing::trace!(?config);
   116
   117        assert_svc_meta(&config.metadata, &svc, 4191);
   118
   119        // There should be a default route.
   120        detect_http_routes(&config, |routes| {
   121            let route = assert_singleton(routes);
   122            assert_route_is_default(route, &svc, 4191);
   123        });
   124
   125        let _route = create(
   126            &client,
   127            mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
   128        )
   129        .await;
   130
   131        let config = rx
   132            .next()
   133            .await
   134            .expect("watch must not fail")
   135            .expect("watch must return an updated config");
   136        tracing::trace!(?config);
   137
   138        assert_svc_meta(&config.metadata, &svc, 4191);
   139
   140        // There should be a route with the logical backend.
   141        detect_http_routes(&config, |routes| {
   142            let route = assert_singleton(routes);
   143            let backends = route_backends_first_available(route);
   144            let backend = assert_singleton(backends);
   145            assert_backend_matches_service(backend, &svc, 4191);
   146        });
   147    })
   148    .await;
   149}
   150
   151#[tokio::test(flavor = "current_thread")]
   152async fn service_with_http_routes_with_backend() {
   153    with_temp_ns(|client, ns| async move {
   154        // Create a service
   155        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   156
   157        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   158        let config = rx
   159            .next()
   160            .await
   161            .expect("watch must not fail")
   162            .expect("watch must return an initial config");
   163        tracing::trace!(?config);
   164
   165        assert_svc_meta(&config.metadata, &svc, 4191);
   166
   167        // There should be a default route.
   168        detect_http_routes(&config, |routes| {
   169            let route = assert_singleton(routes);
   170            assert_route_is_default(route, &svc, 4191);
   171        });
   172
   173        let backend_name = "backend";
   174        let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
   175        let backends = [backend_name];
   176        let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
   177            Some(&backends),
   178            None,
   179            None,
   180        );
   181        let _route = create(&client, route.build()).await;
   182
   183        let config = rx
   184            .next()
   185            .await
   186            .expect("watch must not fail")
   187            .expect("watch must return an updated config");
   188        tracing::trace!(?config);
   189
   190        assert_svc_meta(&config.metadata, &svc, 4191);
   191
   192        // There should be a route with a backend with no filters.
   193        detect_http_routes(&config, |routes| {
   194            let route = assert_singleton(routes);
   195            let backends = route_backends_random_available(route);
   196            let backend = assert_singleton(backends);
   197            assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
   198            let filters = &backend.backend.as_ref().unwrap().filters;
   199            assert_eq!(filters.len(), 0);
   200        });
   201    })
   202    .await;
   203}
   204
   205#[tokio::test(flavor = "current_thread")]
   206async fn service_with_http_routes_with_cross_namespace_backend() {
   207    with_temp_ns(|client, ns| async move {
   208        // Create a service
   209        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   210
   211        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   212        let config = rx
   213            .next()
   214            .await
   215            .expect("watch must not fail")
   216            .expect("watch must return an initial config");
   217        tracing::trace!(?config);
   218
   219        assert_svc_meta(&config.metadata, &svc, 4191);
   220
   221        // There should be a default route.
   222        detect_http_routes(&config, |routes| {
   223            let route = assert_singleton(routes);
   224            assert_route_is_default(route, &svc, 4191);
   225        });
   226
   227        let backend_ns_name = format!("{}-backend", ns);
   228        let backend_ns = create_cluster_scoped(
   229            &client,
   230            k8s::Namespace {
   231                metadata: k8s::ObjectMeta {
   232                    name: Some(backend_ns_name.clone()),
   233                    labels: Some(convert_args!(btreemap!(
   234                        "linkerd-policy-test" => std::thread::current().name().unwrap_or(""),
   235                    ))),
   236                    ..Default::default()
   237                },
   238                ..Default::default()
   239            },
   240        )
   241        .await;
   242        let backend_name = "backend";
   243        let backend_svc = create_service(&client, &backend_ns_name, backend_name, 8888).await;
   244        let backends = [backend_name];
   245        let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
   246            Some(&backends),
   247            Some(backend_ns_name),
   248            None,
   249        );
   250        let _route = create(&client, route.build()).await;
   251
   252        let config = rx
   253            .next()
   254            .await
   255            .expect("watch must not fail")
   256            .expect("watch must return an updated config");
   257        tracing::trace!(?config);
   258
   259        assert_svc_meta(&config.metadata, &svc, 4191);
   260
   261        // There should be a route with a backend with no filters.
   262        detect_http_routes(&config, |routes| {
   263            let route = assert_singleton(routes);
   264            let backends = route_backends_random_available(route);
   265            let backend = assert_singleton(backends);
   266            assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
   267            let filters = &backend.backend.as_ref().unwrap().filters;
   268            assert_eq!(filters.len(), 0);
   269        });
   270
   271        delete_cluster_scoped(&client, backend_ns).await
   272    })
   273    .await;
   274}
   275
   276// TODO: Test fails until handling of invalid backends is implemented.
   277#[tokio::test(flavor = "current_thread")]
   278async fn service_with_http_routes_with_invalid_backend() {
   279    with_temp_ns(|client, ns| async move {
   280        // Create a service
   281        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   282
   283        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   284        let config = rx
   285            .next()
   286            .await
   287            .expect("watch must not fail")
   288            .expect("watch must return an initial config");
   289        tracing::trace!(?config);
   290
   291        assert_svc_meta(&config.metadata, &svc, 4191);
   292
   293        // There should be a default route.
   294        detect_http_routes(&config, |routes| {
   295            let route = assert_singleton(routes);
   296            assert_route_is_default(route, &svc, 4191);
   297        });
   298
   299        let backends = ["invalid-backend"];
   300        let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
   301            Some(&backends),
   302            None,
   303            None,
   304        );
   305        let _route = create(&client, route.build()).await;
   306
   307        let config = rx
   308            .next()
   309            .await
   310            .expect("watch must not fail")
   311            .expect("watch must return an updated config");
   312        tracing::trace!(?config);
   313
   314        assert_svc_meta(&config.metadata, &svc, 4191);
   315
   316        // There should be a route with a backend.
   317        detect_http_routes(&config, |routes| {
   318            let route = assert_singleton(routes);
   319            let backends = route_backends_random_available(route);
   320            let backend = assert_singleton(backends);
   321            assert_backend_has_failure_filter(backend);
   322        });
   323    })
   324    .await;
   325}
   326
   327// TODO: Investigate why the policy controller is only returning one route in this
   328// case instead of two.
   329#[tokio::test(flavor = "current_thread")]
   330async fn service_with_multiple_http_routes() {
   331    with_temp_ns(|client, ns| async move {
   332        // Create a service
   333        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   334
   335        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   336        let config = rx
   337            .next()
   338            .await
   339            .expect("watch must not fail")
   340            .expect("watch must return an initial config");
   341        tracing::trace!(?config);
   342
   343        assert_svc_meta(&config.metadata, &svc, 4191);
   344
   345        // There should be a default route.
   346        detect_http_routes(&config, |routes| {
   347            let route = assert_singleton(routes);
   348            assert_route_is_default(route, &svc, 4191);
   349        });
   350
   351        // Routes should be returned in sorted order by creation timestamp then
   352        // name. To ensure that this test isn't timing dependant, routes should
   353        // be created in alphabetical order.
   354        let _a_route = create(
   355            &client,
   356            mk_http_route(&ns, "a-route", &svc, Some(4191)).build(),
   357        )
   358        .await;
   359
   360        // First route update.
   361        let config = rx
   362            .next()
   363            .await
   364            .expect("watch must not fail")
   365            .expect("watch must return an updated config");
   366        tracing::trace!(?config);
   367
   368        assert_svc_meta(&config.metadata, &svc, 4191);
   369
   370        let _b_route = create(
   371            &client,
   372            mk_http_route(&ns, "b-route", &svc, Some(4191)).build(),
   373        )
   374        .await;
   375
   376        // Second route update.
   377        let config = rx
   378            .next()
   379            .await
   380            .expect("watch must not fail")
   381            .expect("watch must return an updated config");
   382        tracing::trace!(?config);
   383
   384        assert_svc_meta(&config.metadata, &svc, 4191);
   385
   386        // There should be 2 routes, returned in order.
   387        detect_http_routes(&config, |routes| {
   388            assert_eq!(routes.len(), 2);
   389            assert_eq!(route_name(&routes[0]), "a-route");
   390            assert_eq!(route_name(&routes[1]), "b-route");
   391        });
   392    })
   393    .await;
   394}
   395
   396#[tokio::test(flavor = "current_thread")]
   397async fn service_with_consecutive_failure_accrual() {
   398    with_temp_ns(|client, ns| async move {
   399        let svc = create_annotated_service(
   400            &client,
   401            &ns,
   402            "consecutive-accrual-svc",
   403            80,
   404            BTreeMap::from([
   405                (
   406                    "balancer.linkerd.io/failure-accrual".to_string(),
   407                    "consecutive".to_string(),
   408                ),
   409                (
   410                    "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
   411                    "8".to_string(),
   412                ),
   413                (
   414                    "balancer.linkerd.io/failure-accrual-consecutive-min-penalty".to_string(),
   415                    "10s".to_string(),
   416                ),
   417                (
   418                    "balancer.linkerd.io/failure-accrual-consecutive-max-penalty".to_string(),
   419                    "10m".to_string(),
   420                ),
   421                (
   422                    "balancer.linkerd.io/failure-accrual-consecutive-jitter-ratio".to_string(),
   423                    "1.0".to_string(),
   424                ),
   425            ]),
   426        )
   427        .await;
   428
   429        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   430        let config = rx
   431            .next()
   432            .await
   433            .expect("watch must not fail")
   434            .expect("watch must return an initial config");
   435        tracing::trace!(?config);
   436
   437        assert_svc_meta(&config.metadata, &svc, 4191);
   438
   439        detect_failure_accrual(&config, |accrual| {
   440            let consecutive = failure_accrual_consecutive(accrual);
   441            assert_eq!(8, consecutive.max_failures);
   442            assert_eq!(
   443                &grpc::outbound::ExponentialBackoff {
   444                    min_backoff: Some(Duration::from_secs(10).try_into().unwrap()),
   445                    max_backoff: Some(Duration::from_secs(600).try_into().unwrap()),
   446                    jitter_ratio: 1.0_f32,
   447                },
   448                consecutive
   449                    .backoff
   450                    .as_ref()
   451                    .expect("backoff must be configured")
   452            );
   453        });
   454    })
   455    .await;
   456}
   457
   458#[tokio::test(flavor = "current_thread")]
   459async fn service_with_consecutive_failure_accrual_defaults() {
   460    with_temp_ns(|client, ns| async move {
   461        // Create a service configured to do consecutive failure accrual, but
   462        // with no additional configuration
   463        let svc = create_annotated_service(
   464            &client,
   465            &ns,
   466            "default-accrual-svc",
   467            80,
   468            BTreeMap::from([(
   469                "balancer.linkerd.io/failure-accrual".to_string(),
   470                "consecutive".to_string(),
   471            )]),
   472        )
   473        .await;
   474
   475        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   476        let config = rx
   477            .next()
   478            .await
   479            .expect("watch must not fail")
   480            .expect("watch must return an initial config");
   481        tracing::trace!(?config);
   482
   483        assert_svc_meta(&config.metadata, &svc, 4191);
   484
   485        // Expect default max_failures and default backoff
   486        detect_failure_accrual(&config, |accrual| {
   487            let consecutive = failure_accrual_consecutive(accrual);
   488            assert_eq!(7, consecutive.max_failures);
   489            assert_default_accrual_backoff!(consecutive
   490                .backoff
   491                .as_ref()
   492                .expect("backoff must be configured"));
   493        });
   494
   495        // Create a service configured to do consecutive failure accrual with
   496        // max number of failures and with default backoff
   497        let svc = create_annotated_service(
   498            &client,
   499            &ns,
   500            "no-backoff-svc",
   501            80,
   502            BTreeMap::from([
   503                (
   504                    "balancer.linkerd.io/failure-accrual".to_string(),
   505                    "consecutive".to_string(),
   506                ),
   507                (
   508                    "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
   509                    "8".to_string(),
   510                ),
   511            ]),
   512        )
   513        .await;
   514
   515        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   516        let config = rx
   517            .next()
   518            .await
   519            .expect("watch must not fail")
   520            .expect("watch must return an initial config");
   521        tracing::trace!(?config);
   522
   523        // Expect default backoff and overridden max_failures
   524        detect_failure_accrual(&config, |accrual| {
   525            let consecutive = failure_accrual_consecutive(accrual);
   526            assert_eq!(8, consecutive.max_failures);
   527            assert_default_accrual_backoff!(consecutive
   528                .backoff
   529                .as_ref()
   530                .expect("backoff must be configured"));
   531        });
   532
   533        // Create a service configured to do consecutive failure accrual with
   534        // only the jitter ratio configured in the backoff
   535        let svc = create_annotated_service(
   536            &client,
   537            &ns,
   538            "only-jitter-svc",
   539            80,
   540            BTreeMap::from([
   541                (
   542                    "balancer.linkerd.io/failure-accrual".to_string(),
   543                    "consecutive".to_string(),
   544                ),
   545                (
   546                    "balancer.linkerd.io/failure-accrual-consecutive-jitter-ratio".to_string(),
   547                    "1.0".to_string(),
   548                ),
   549            ]),
   550        )
   551        .await;
   552
   553        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   554        let config = rx
   555            .next()
   556            .await
   557            .expect("watch must not fail")
   558            .expect("watch must return an initial config");
   559        tracing::trace!(?config);
   560
   561        // Expect defaults for everything except for the jitter ratio
   562        detect_failure_accrual(&config, |accrual| {
   563            let consecutive = failure_accrual_consecutive(accrual);
   564            assert_eq!(7, consecutive.max_failures);
   565            assert_eq!(
   566                &grpc::outbound::ExponentialBackoff {
   567                    min_backoff: Some(Duration::from_secs(1).try_into().unwrap()),
   568                    max_backoff: Some(Duration::from_secs(60).try_into().unwrap()),
   569                    jitter_ratio: 1.0_f32,
   570                },
   571                consecutive
   572                    .backoff
   573                    .as_ref()
   574                    .expect("backoff must be configured")
   575            );
   576        });
   577    })
   578    .await;
   579}
   580
   581#[tokio::test(flavor = "current_thread")]
   582async fn service_with_default_failure_accrual() {
   583    with_temp_ns(|client, ns| async move {
   584        // Default config for Service, no failure accrual
   585        let svc = create_service(&client, &ns, "default-failure-accrual", 80).await;
   586
   587        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   588        let config = rx
   589            .next()
   590            .await
   591            .expect("watch must not fail")
   592            .expect("watch must return an initial config");
   593        tracing::trace!(?config);
   594
   595        // Expect failure accrual config to be default (no failure accrual)
   596        detect_failure_accrual(&config, |accrual| {
   597            assert!(
   598                accrual.is_none(),
   599                "consecutive failure accrual should not be configured for service"
   600            );
   601        });
   602
   603        // Create Service with consecutive failure accrual config for
   604        // max_failures but no mode
   605        let svc = create_annotated_service(
   606            &client,
   607            &ns,
   608            "default-max-failure-svc",
   609            80,
   610            BTreeMap::from([(
   611                "balancer.linkerd.io/failure-accrual-consecutive-max-failures".to_string(),
   612                "8".to_string(),
   613            )]),
   614        )
   615        .await;
   616
   617        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   618        let config = rx
   619            .next()
   620            .await
   621            .expect("watch must not fail")
   622            .expect("watch must return an initial config");
   623        tracing::trace!(?config);
   624
   625        // Expect failure accrual config to be default (no failure accrual)
   626        detect_failure_accrual(&config, |accrual| {
   627            assert!(
   628                accrual.is_none(),
   629                "consecutive failure accrual should not be configured for service"
   630            )
   631        });
   632    })
   633    .await;
   634}
   635
   636#[tokio::test(flavor = "current_thread")]
   637async fn opaque_service() {
   638    with_temp_ns(|client, ns| async move {
   639        // Create a service
   640        let svc = create_opaque_service(&client, &ns, "my-svc", 4191).await;
   641
   642        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   643        let config = rx
   644            .next()
   645            .await
   646            .expect("watch must not fail")
   647            .expect("watch must return an initial config");
   648        tracing::trace!(?config);
   649
   650        assert_svc_meta(&config.metadata, &svc, 4191);
   651
   652        // Proxy protocol should be opaque.
   653        match config.protocol.unwrap().kind.unwrap() {
   654            grpc::outbound::proxy_protocol::Kind::Opaque(_) => {}
   655            _ => panic!("proxy protocol must be Opaque"),
   656        };
   657    })
   658    .await;
   659}
   660
   661#[tokio::test(flavor = "current_thread")]
   662async fn route_rule_with_filters() {
   663    with_temp_ns(|client, ns| async move {
   664        // Create a service
   665        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   666
   667        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   668        let config = rx
   669            .next()
   670            .await
   671            .expect("watch must not fail")
   672            .expect("watch must return an initial config");
   673        tracing::trace!(?config);
   674
   675        // There should be a default route.
   676        detect_http_routes(&config, |routes| {
   677            let route = assert_singleton(routes);
   678            assert_route_is_default(route, &svc, 4191);
   679        });
   680
   681        let backend_name = "backend";
   682        let backends = [backend_name];
   683        let route = mk_http_route(
   684            &ns,
   685            "foo-route",
   686            &svc,
   687            Some(4191),
   688        ).with_backends(Some(&backends), None, None).with_filters(Some(vec![
   689                k8s::policy::httproute::HttpRouteFilter::RequestHeaderModifier {
   690                    request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter {
   691                        set: Some(vec![k8s_gateway_api::HttpHeader {
   692                            name: "set".to_string(),
   693                            value: "set-value".to_string(),
   694                        }]),
   695                        add: Some(vec![k8s_gateway_api::HttpHeader {
   696                            name: "add".to_string(),
   697                            value: "add-value".to_string(),
   698                        }]),
   699                        remove: Some(vec!["remove".to_string()]),
   700                    },
   701                },
   702                k8s::policy::httproute::HttpRouteFilter::RequestRedirect {
   703                    request_redirect: k8s_gateway_api::HttpRequestRedirectFilter {
   704                        scheme: Some("http".to_string()),
   705                        hostname: Some("host".to_string()),
   706                        path: Some(k8s_gateway_api::HttpPathModifier::ReplacePrefixMatch {
   707                            replace_prefix_match: "/path".to_string(),
   708                        }),
   709                        port: Some(5555),
   710                        status_code: Some(302),
   711                    },
   712                },
   713            ]));
   714        let _route = create(
   715            &client,
   716            route.build(),
   717        )
   718        .await;
   719
   720        let config = rx
   721            .next()
   722            .await
   723            .expect("watch must not fail")
   724            .expect("watch must return an updated config");
   725        tracing::trace!(?config);
   726
   727        // There should be a route with filters.
   728        detect_http_routes(&config, |routes| {
   729            let route = assert_singleton(routes);
   730            let rule = assert_singleton(&route.rules);
   731            let filters = &rule.filters;
   732            assert_eq!(
   733                *filters,
   734                vec![
   735                    grpc::outbound::http_route::Filter {
   736                        kind: Some(
   737                            grpc::outbound::http_route::filter::Kind::RequestHeaderModifier(
   738                                grpc::http_route::RequestHeaderModifier {
   739                                    add: Some(grpc::http_types::Headers {
   740                                        headers: vec![grpc::http_types::headers::Header {
   741                                            name: "add".to_string(),
   742                                            value: "add-value".into(),
   743                                        }]
   744                                    }),
   745                                    set: Some(grpc::http_types::Headers {
   746                                        headers: vec![grpc::http_types::headers::Header {
   747                                            name: "set".to_string(),
   748                                            value: "set-value".into(),
   749                                        }]
   750                                    }),
   751                                    remove: vec!["remove".to_string()],
   752                                }
   753                            )
   754                        )
   755                    },
   756                    grpc::outbound::http_route::Filter {
   757                        kind: Some(grpc::outbound::http_route::filter::Kind::Redirect(
   758                            grpc::http_route::RequestRedirect {
   759                                scheme: Some(grpc::http_types::Scheme {
   760                                    r#type: Some(grpc::http_types::scheme::Type::Registered(
   761                                        grpc::http_types::scheme::Registered::Http.into(),
   762                                    ))
   763                                }),
   764                                host: "host".to_string(),
   765                                path: Some(linkerd2_proxy_api::http_route::PathModifier { replace: Some(linkerd2_proxy_api::http_route::path_modifier::Replace::Prefix("/path".to_string())) }),
   766                                port: 5555,
   767                                status: 302,
   768                            }
   769                        ))
   770                    }
   771                ]
   772            );
   773        });
   774    })
   775    .await;
   776}
   777
   778#[tokio::test(flavor = "current_thread")]
   779async fn backend_with_filters() {
   780    with_temp_ns(|client, ns| async move {
   781        // Create a service
   782        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   783
   784        let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   785        let config = rx
   786            .next()
   787            .await
   788            .expect("watch must not fail")
   789            .expect("watch must return an initial config");
   790        tracing::trace!(?config);
   791
   792        assert_svc_meta(&config.metadata, &svc, 4191);
   793
   794        // There should be a default route.
   795        detect_http_routes(&config, |routes| {
   796            let route = assert_singleton(routes);
   797            assert_route_is_default(route, &svc, 4191);
   798        });
   799
   800        let backend_name = "backend";
   801        let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
   802        let backends = [backend_name];
   803        let route = mk_http_route(
   804            &ns,
   805            "foo-route",
   806            &svc,
   807            Some(4191)
   808        ).with_backends(Some(&backends), None, Some(vec![
   809                k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier {
   810                    request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter {
   811                        set: Some(vec![k8s_gateway_api::HttpHeader {
   812                            name: "set".to_string(),
   813                            value: "set-value".to_string(),
   814                        }]),
   815                        add: Some(vec![k8s_gateway_api::HttpHeader {
   816                            name: "add".to_string(),
   817                            value: "add-value".to_string(),
   818                        }]),
   819                        remove: Some(vec!["remove".to_string()]),
   820                    },
   821                },
   822                k8s_gateway_api::HttpRouteFilter::RequestRedirect {
   823                    request_redirect: k8s_gateway_api::HttpRequestRedirectFilter {
   824                        scheme: Some("http".to_string()),
   825                        hostname: Some("host".to_string()),
   826                        path: Some(k8s_gateway_api::HttpPathModifier::ReplacePrefixMatch {
   827                            replace_prefix_match: "/path".to_string(),
   828                        }),
   829                        port: Some(5555),
   830                        status_code: Some(302),
   831                    },
   832                },
   833            ]));
   834        let _route = create(&client, route.build())
   835        .await;
   836
   837        let config = rx
   838            .next()
   839            .await
   840            .expect("watch must not fail")
   841            .expect("watch must return an updated config");
   842        tracing::trace!(?config);
   843
   844        assert_svc_meta(&config.metadata, &svc, 4191);
   845
   846        // There should be a route without rule filters.
   847        detect_http_routes(&config, |routes| {
   848            let route = assert_singleton(routes);
   849            let rule = assert_singleton(&route.rules);
   850            assert_eq!(rule.filters.len(), 0);
   851            let backends = route_backends_random_available(route);
   852            let backend = assert_singleton(backends);
   853            assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
   854            let filters = &backend.backend.as_ref().unwrap().filters;
   855            assert_eq!(
   856                *filters,
   857                vec![
   858                    grpc::outbound::http_route::Filter {
   859                        kind: Some(
   860                            grpc::outbound::http_route::filter::Kind::RequestHeaderModifier(
   861                                grpc::http_route::RequestHeaderModifier {
   862                                    add: Some(grpc::http_types::Headers {
   863                                        headers: vec![grpc::http_types::headers::Header {
   864                                            name: "add".to_string(),
   865                                            value: "add-value".into(),
   866                                        }]
   867                                    }),
   868                                    set: Some(grpc::http_types::Headers {
   869                                        headers: vec![grpc::http_types::headers::Header {
   870                                            name: "set".to_string(),
   871                                            value: "set-value".into(),
   872                                        }]
   873                                    }),
   874                                    remove: vec!["remove".to_string()],
   875                                }
   876                            )
   877                        )
   878                    },
   879                    grpc::outbound::http_route::Filter {
   880                        kind: Some(grpc::outbound::http_route::filter::Kind::Redirect(
   881                            grpc::http_route::RequestRedirect {
   882                                scheme: Some(grpc::http_types::Scheme {
   883                                    r#type: Some(grpc::http_types::scheme::Type::Registered(
   884                                        grpc::http_types::scheme::Registered::Http.into(),
   885                                    ))
   886                                }),
   887                                host: "host".to_string(),
   888                                path: Some(linkerd2_proxy_api::http_route::PathModifier { replace: Some(linkerd2_proxy_api::http_route::path_modifier::Replace::Prefix("/path".to_string())) }),
   889                                port: 5555,
   890                                status: 302,
   891                            }
   892                        ))
   893                    }
   894                ]
   895            );
   896        });
   897    })
   898    .await;
   899}
   900
   901#[tokio::test(flavor = "current_thread")]
   902async fn http_route_with_no_port() {
   903    with_temp_ns(|client, ns| async move {
   904        // Create a service
   905        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   906
   907        let mut rx_4191 = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   908        let config_4191 = rx_4191
   909            .next()
   910            .await
   911            .expect("watch must not fail")
   912            .expect("watch must return an initial config");
   913        tracing::trace!(?config_4191);
   914
   915        assert_svc_meta(&config_4191.metadata, &svc, 4191);
   916
   917        let mut rx_9999 = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
   918        let config_9999 = rx_9999
   919            .next()
   920            .await
   921            .expect("watch must not fail")
   922            .expect("watch must return an initial config");
   923        tracing::trace!(?config_9999);
   924
   925        assert_svc_meta(&config_9999.metadata, &svc, 9999);
   926
   927        // There should be a default route.
   928        detect_http_routes(&config_4191, |routes| {
   929            let route = assert_singleton(routes);
   930            assert_route_is_default(route, &svc, 4191);
   931        });
   932        detect_http_routes(&config_9999, |routes| {
   933            let route = assert_singleton(routes);
   934            assert_route_is_default(route, &svc, 9999);
   935        });
   936
   937        let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await;
   938
   939        let config_4191 = rx_4191
   940            .next()
   941            .await
   942            .expect("watch must not fail")
   943            .expect("watch must return an updated config");
   944        tracing::trace!(?config_4191);
   945
   946        // The route should apply to the service.
   947        detect_http_routes(&config_4191, |routes| {
   948            let route = assert_singleton(routes);
   949            assert_route_name_eq(route, "foo-route");
   950        });
   951
   952        let config_9999 = rx_9999
   953            .next()
   954            .await
   955            .expect("watch must not fail")
   956            .expect("watch must return an updated config");
   957        tracing::trace!(?config_9999);
   958
   959        // The route should apply to other ports too.
   960        detect_http_routes(&config_9999, |routes| {
   961            let route = assert_singleton(routes);
   962            assert_route_name_eq(route, "foo-route");
   963        });
   964    })
   965    .await;
   966}
   967
   968#[tokio::test(flavor = "current_thread")]
   969async fn producer_route() {
   970    with_temp_ns(|client, ns| async move {
   971        // Create a service
   972        let svc = create_service(&client, &ns, "my-svc", 4191).await;
   973
   974        let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
   975        let producer_config = producer_rx
   976            .next()
   977            .await
   978            .expect("watch must not fail")
   979            .expect("watch must return an initial config");
   980        tracing::trace!(?producer_config);
   981
   982        let mut consumer_rx = retry_watch_outbound_policy(&client, "consumer_ns", &svc, 4191).await;
   983        let consumer_config = consumer_rx
   984            .next()
   985            .await
   986            .expect("watch must not fail")
   987            .expect("watch must return an initial config");
   988        tracing::trace!(?consumer_config);
   989
   990        // There should be a default route.
   991        detect_http_routes(&producer_config, |routes| {
   992            let route = assert_singleton(routes);
   993            assert_route_is_default(route, &svc, 4191);
   994        });
   995        detect_http_routes(&consumer_config, |routes| {
   996            let route = assert_singleton(routes);
   997            assert_route_is_default(route, &svc, 4191);
   998        });
   999
  1000        // A route created in the same namespace as its parent service is called
  1001        // a producer route. It should be returned in outbound policy requests
  1002        // for that service from ALL namespaces.
  1003        let _route = create(
  1004            &client,
  1005            mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
  1006        )
  1007        .await;
  1008
  1009        let producer_config = producer_rx
  1010            .next()
  1011            .await
  1012            .expect("watch must not fail")
  1013            .expect("watch must return an updated config");
  1014        tracing::trace!(?producer_config);
  1015        let consumer_config = consumer_rx
  1016            .next()
  1017            .await
  1018            .expect("watch must not fail")
  1019            .expect("watch must return an initial config");
  1020        tracing::trace!(?consumer_config);
  1021
  1022        // The route should be returned in queries from the producer namespace.
  1023        detect_http_routes(&producer_config, |routes| {
  1024            let route = assert_singleton(routes);
  1025            assert_route_name_eq(route, "foo-route");
  1026        });
  1027
  1028        // The route should be returned in queries from a consumer namespace.
  1029        detect_http_routes(&consumer_config, |routes| {
  1030            let route = assert_singleton(routes);
  1031            assert_route_name_eq(route, "foo-route");
  1032        });
  1033    })
  1034    .await;
  1035}
  1036
  1037#[tokio::test(flavor = "current_thread")]
  1038async fn pre_existing_producer_route() {
  1039    // We test the scenario where outbound policy watches are initiated after
  1040    // a produce route already exists.
  1041    with_temp_ns(|client, ns| async move {
  1042        // Create a service
  1043        let svc = create_service(&client, &ns, "my-svc", 4191).await;
  1044
  1045        // A route created in the same namespace as its parent service is called
  1046        // a producer route. It should be returned in outbound policy requests
  1047        // for that service from ALL namespaces.
  1048        let _route = create(
  1049            &client,
  1050            mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
  1051        )
  1052        .await;
  1053
  1054        let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
  1055        let producer_config = producer_rx
  1056            .next()
  1057            .await
  1058            .expect("watch must not fail")
  1059            .expect("watch must return an initial config");
  1060        tracing::trace!(?producer_config);
  1061
  1062        let mut consumer_rx = retry_watch_outbound_policy(&client, "consumer_ns", &svc, 4191).await;
  1063        let consumer_config = consumer_rx
  1064            .next()
  1065            .await
  1066            .expect("watch must not fail")
  1067            .expect("watch must return an initial config");
  1068        tracing::trace!(?consumer_config);
  1069
  1070        // The route should be returned in queries from the producer namespace.
  1071        detect_http_routes(&producer_config, |routes| {
  1072            let route = assert_singleton(routes);
  1073            assert_route_name_eq(route, "foo-route");
  1074        });
  1075
  1076        // The route should be returned in queries from a consumer namespace.
  1077        detect_http_routes(&consumer_config, |routes| {
  1078            let route = assert_singleton(routes);
  1079            assert_route_name_eq(route, "foo-route");
  1080        });
  1081    })
  1082    .await;
  1083}
  1084
  1085#[tokio::test(flavor = "current_thread")]
  1086async fn consumer_route() {
  1087    with_temp_ns(|client, ns| async move {
  1088        // Create a service
  1089        let svc = create_service(&client, &ns, "my-svc", 4191).await;
  1090
  1091        let consumer_ns_name = format!("{}-consumer", ns);
  1092        let consumer_ns = create_cluster_scoped(
  1093            &client,
  1094            k8s::Namespace {
  1095                metadata: k8s::ObjectMeta {
  1096                    name: Some(consumer_ns_name.clone()),
  1097                    labels: Some(convert_args!(btreemap!(
  1098                        "linkerd-policy-test" => std::thread::current().name().unwrap_or(""),
  1099                    ))),
  1100                    ..Default::default()
  1101                },
  1102                ..Default::default()
  1103            },
  1104        )
  1105        .await;
  1106
  1107        let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
  1108        let producer_config = producer_rx
  1109            .next()
  1110            .await
  1111            .expect("watch must not fail")
  1112            .expect("watch must return an initial config");
  1113        tracing::trace!(?producer_config);
  1114
  1115        let mut consumer_rx =
  1116            retry_watch_outbound_policy(&client, &consumer_ns_name, &svc, 4191).await;
  1117        let consumer_config = consumer_rx
  1118            .next()
  1119            .await
  1120            .expect("watch must not fail")
  1121            .expect("watch must return an initial config");
  1122        tracing::trace!(?consumer_config);
  1123
  1124        let mut other_rx = retry_watch_outbound_policy(&client, "other_ns", &svc, 4191).await;
  1125        let other_config = other_rx
  1126            .next()
  1127            .await
  1128            .expect("watch must not fail")
  1129            .expect("watch must return an initial config");
  1130        tracing::trace!(?other_config);
  1131
  1132        // There should be a default route.
  1133        detect_http_routes(&producer_config, |routes| {
  1134            let route = assert_singleton(routes);
  1135            assert_route_is_default(route, &svc, 4191);
  1136        });
  1137        detect_http_routes(&consumer_config, |routes| {
  1138            let route = assert_singleton(routes);
  1139            assert_route_is_default(route, &svc, 4191);
  1140        });
  1141        detect_http_routes(&other_config, |routes| {
  1142            let route = assert_singleton(routes);
  1143            assert_route_is_default(route, &svc, 4191);
  1144        });
  1145
  1146        // A route created in a different namespace as its parent service is
  1147        // called a consumer route. It should be returned in outbound policy
  1148        // requests for that service ONLY when the request comes from the
  1149        // consumer namespace.
  1150        let _route = create(
  1151            &client,
  1152            mk_http_route(&consumer_ns_name, "foo-route", &svc, Some(4191)).build(),
  1153        )
  1154        .await;
  1155
  1156        // The route should NOT be returned in queries from the producer namespace.
  1157        // There should be a default route.
  1158        assert!(producer_rx.next().now_or_never().is_none());
  1159
  1160        // The route should be returned in queries from the same consumer
  1161        // namespace.
  1162        let consumer_config = consumer_rx
  1163            .next()
  1164            .await
  1165            .expect("watch must not fail")
  1166            .expect("watch must return an initial config");
  1167        tracing::trace!(?consumer_config);
  1168        detect_http_routes(&consumer_config, |routes| {
  1169            let route = assert_singleton(routes);
  1170            assert_route_name_eq(route, "foo-route");
  1171        });
  1172
  1173        // The route should NOT be returned in queries from a different consumer
  1174        // namespace.
  1175        assert!(other_rx.next().now_or_never().is_none());
  1176
  1177        delete_cluster_scoped(&client, consumer_ns).await;
  1178    })
  1179    .await;
  1180}
  1181
  1182/* Helpers */
  1183
  1184struct HttpRouteBuilder(k8s::policy::HttpRoute);
  1185
  1186async fn retry_watch_outbound_policy(
  1187    client: &kube::Client,
  1188    ns: &str,
  1189    svc: &k8s::Service,
  1190    port: u16,
  1191) -> tonic::Streaming<grpc::outbound::OutboundPolicy> {
  1192    // Port-forward to the control plane and start watching the service's
  1193    // outbound policy.
  1194    let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await;
  1195    loop {
  1196        match policy_api.watch(ns, svc, port).await {
  1197            Ok(rx) => return rx,
  1198            Err(error) => {
  1199                tracing::error!(
  1200                    ?error,
  1201                    ns,
  1202                    svc = svc.name_unchecked(),
  1203                    "failed to watch outbound policy for port 4191"
  1204                );
  1205                time::sleep(Duration::from_secs(1)).await;
  1206            }
  1207        }
  1208    }
  1209}
  1210
  1211fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option<u16>) -> HttpRouteBuilder {
  1212    use k8s::policy::httproute as api;
  1213
  1214    HttpRouteBuilder(api::HttpRoute {
  1215        metadata: kube::api::ObjectMeta {
  1216            namespace: Some(ns.to_string()),
  1217            name: Some(name.to_string()),
  1218            ..Default::default()
  1219        },
  1220        spec: api::HttpRouteSpec {
  1221            inner: api::CommonRouteSpec {
  1222                parent_refs: Some(vec![api::ParentReference {
  1223                    group: Some("core".to_string()),
  1224                    kind: Some("Service".to_string()),
  1225                    namespace: svc.namespace(),
  1226                    name: svc.name_unchecked(),
  1227                    section_name: None,
  1228                    port,
  1229                }]),
  1230            },
  1231            hostnames: None,
  1232            rules: Some(vec![api::HttpRouteRule {
  1233                matches: Some(vec![api::HttpRouteMatch {
  1234                    path: Some(api::HttpPathMatch::Exact {
  1235                        value: "/foo".to_string(),
  1236                    }),
  1237                    headers: None,
  1238                    query_params: None,
  1239                    method: Some("GET".to_string()),
  1240                }]),
  1241                filters: None,
  1242                backend_refs: None,
  1243                timeouts: None,
  1244            }]),
  1245        },
  1246        status: None,
  1247    })
  1248}
  1249
  1250impl HttpRouteBuilder {
  1251    fn with_backends(
  1252        self,
  1253        backends: Option<&[&str]>,
  1254        backends_ns: Option<String>,
  1255        backend_filters: Option<Vec<k8s_gateway_api::HttpRouteFilter>>,
  1256    ) -> Self {
  1257        let mut route = self.0;
  1258        let backend_refs = backends.map(|names| {
  1259            names
  1260                .iter()
  1261                .map(|name| k8s::policy::httproute::HttpBackendRef {
  1262                    backend_ref: Some(k8s_gateway_api::BackendRef {
  1263                        weight: None,
  1264                        inner: k8s_gateway_api::BackendObjectReference {
  1265                            name: name.to_string(),
  1266                            port: Some(8888),
  1267                            group: None,
  1268                            kind: None,
  1269                            namespace: backends_ns.clone(),
  1270                        },
  1271                    }),
  1272                    filters: backend_filters.clone(),
  1273                })
  1274                .collect()
  1275        });
  1276        route.spec.rules.iter_mut().flatten().for_each(|rule| {
  1277            rule.backend_refs = backend_refs.clone();
  1278        });
  1279        Self(route)
  1280    }
  1281
  1282    fn with_filters(self, filters: Option<Vec<k8s::policy::httproute::HttpRouteFilter>>) -> Self {
  1283        let mut route = self.0;
  1284        route
  1285            .spec
  1286            .rules
  1287            .iter_mut()
  1288            .flatten()
  1289            .for_each(|rule| rule.filters = filters.clone());
  1290        Self(route)
  1291    }
  1292
  1293    fn build(self) -> k8s::policy::HttpRoute {
  1294        self.0
  1295    }
  1296}
  1297
  1298fn mk_empty_http_route(
  1299    ns: &str,
  1300    name: &str,
  1301    svc: &k8s::Service,
  1302    port: u16,
  1303) -> k8s::policy::HttpRoute {
  1304    use k8s::policy::httproute as api;
  1305    api::HttpRoute {
  1306        metadata: kube::api::ObjectMeta {
  1307            namespace: Some(ns.to_string()),
  1308            name: Some(name.to_string()),
  1309            ..Default::default()
  1310        },
  1311        spec: api::HttpRouteSpec {
  1312            inner: api::CommonRouteSpec {
  1313                parent_refs: Some(vec![api::ParentReference {
  1314                    group: Some("core".to_string()),
  1315                    kind: Some("Service".to_string()),
  1316                    namespace: svc.namespace(),
  1317                    name: svc.name_unchecked(),
  1318                    section_name: None,
  1319                    port: Some(port),
  1320                }]),
  1321            },
  1322            hostnames: None,
  1323            rules: Some(vec![]),
  1324        },
  1325        status: None,
  1326    }
  1327}
  1328
  1329// detect_http_routes asserts that the given outbound policy has a proxy protcol
  1330// of "Detect" and then invokes the given function with the Http1 and Http2
  1331// routes from the Detect.
  1332#[track_caller]
  1333fn detect_http_routes<F>(config: &grpc::outbound::OutboundPolicy, f: F)
  1334where
  1335    F: Fn(&[grpc::outbound::HttpRoute]),
  1336{
  1337    let kind = config
  1338        .protocol
  1339        .as_ref()
  1340        .expect("must have proxy protocol")
  1341        .kind
  1342        .as_ref()
  1343        .expect("must have kind");
  1344    if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
  1345        opaque: _,
  1346        timeout: _,
  1347        http1,
  1348        http2,
  1349    }) = kind
  1350    {
  1351        let http1 = http1
  1352            .as_ref()
  1353            .expect("proxy protocol must have http1 field");
  1354        let http2 = http2
  1355            .as_ref()
  1356            .expect("proxy protocol must have http2 field");
  1357        f(&http1.routes);
  1358        f(&http2.routes);
  1359    } else {
  1360        panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
  1361    }
  1362}
  1363
  1364#[track_caller]
  1365fn detect_failure_accrual<F>(config: &grpc::outbound::OutboundPolicy, f: F)
  1366where
  1367    F: Fn(Option<&grpc::outbound::FailureAccrual>),
  1368{
  1369    let kind = config
  1370        .protocol
  1371        .as_ref()
  1372        .expect("must have proxy protocol")
  1373        .kind
  1374        .as_ref()
  1375        .expect("must have kind");
  1376    if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
  1377        opaque: _,
  1378        timeout: _,
  1379        http1,
  1380        http2,
  1381    }) = kind
  1382    {
  1383        let http1 = http1
  1384            .as_ref()
  1385            .expect("proxy protocol must have http1 field");
  1386        let http2 = http2
  1387            .as_ref()
  1388            .expect("proxy protocol must have http2 field");
  1389        f(http1.failure_accrual.as_ref());
  1390        f(http2.failure_accrual.as_ref());
  1391    } else {
  1392        panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
  1393    }
  1394}
  1395
  1396#[track_caller]
  1397fn failure_accrual_consecutive(
  1398    accrual: Option<&grpc::outbound::FailureAccrual>,
  1399) -> &grpc::outbound::failure_accrual::ConsecutiveFailures {
  1400    assert!(
  1401        accrual.is_some(),
  1402        "failure accrual must be configured for service"
  1403    );
  1404    let kind = accrual
  1405        .unwrap()
  1406        .kind
  1407        .as_ref()
  1408        .expect("failure accrual must have kind");
  1409    let grpc::outbound::failure_accrual::Kind::ConsecutiveFailures(accrual) = kind;
  1410    accrual
  1411}
  1412
  1413#[track_caller]
  1414fn route_backends_first_available(
  1415    route: &grpc::outbound::HttpRoute,
  1416) -> &[grpc::outbound::http_route::RouteBackend] {
  1417    let kind = assert_singleton(&route.rules)
  1418        .backends
  1419        .as_ref()
  1420        .expect("Rule must have backends")
  1421        .kind
  1422        .as_ref()
  1423        .expect("Backend must have kind");
  1424    match kind {
  1425        grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends,
  1426        _ => panic!("Distribution must be FirstAvailable"),
  1427    }
  1428}
  1429
  1430#[track_caller]
  1431fn route_backends_random_available(
  1432    route: &grpc::outbound::HttpRoute,
  1433) -> &[grpc::outbound::http_route::WeightedRouteBackend] {
  1434    let kind = assert_singleton(&route.rules)
  1435        .backends
  1436        .as_ref()
  1437        .expect("Rule must have backends")
  1438        .kind
  1439        .as_ref()
  1440        .expect("Backend must have kind");
  1441    match kind {
  1442        grpc::outbound::http_route::distribution::Kind::RandomAvailable(dist) => &dist.backends,
  1443        _ => panic!("Distribution must be RandomAvailable"),
  1444    }
  1445}
  1446
  1447#[track_caller]
  1448fn route_name(route: &grpc::outbound::HttpRoute) -> &str {
  1449    match route.metadata.as_ref().unwrap().kind.as_ref().unwrap() {
  1450        grpc::meta::metadata::Kind::Resource(grpc::meta::Resource { ref name, .. }) => name,
  1451        _ => panic!("route must be a resource kind"),
  1452    }
  1453}
  1454
  1455#[track_caller]
  1456fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::WeightedRouteBackend) {
  1457    let filter = assert_singleton(&backend.backend.as_ref().unwrap().filters);
  1458    match filter.kind.as_ref().unwrap() {
  1459        grpc::outbound::http_route::filter::Kind::FailureInjector(_) => {}
  1460        _ => panic!("backend must have FailureInjector filter"),
  1461    };
  1462}
  1463
  1464#[track_caller]
  1465fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) {
  1466    let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
  1467    match kind {
  1468        grpc::meta::metadata::Kind::Default(_) => {}
  1469        grpc::meta::metadata::Kind::Resource(r) => {
  1470            panic!("route expected to be default but got resource {r:?}")
  1471        }
  1472    }
  1473
  1474    let backends = route_backends_first_available(route);
  1475    let backend = assert_singleton(backends);
  1476    assert_backend_matches_service(backend, svc, port);
  1477
  1478    let rule = assert_singleton(&route.rules);
  1479    let route_match = assert_singleton(&rule.matches);
  1480    let path_match = route_match.path.as_ref().unwrap().kind.as_ref().unwrap();
  1481    assert_eq!(
  1482        *path_match,
  1483        grpc::http_route::path_match::Kind::Prefix("/".to_string())
  1484    );
  1485}
  1486
  1487#[track_caller]
  1488fn assert_backend_matches_service(
  1489    backend: &grpc::outbound::http_route::RouteBackend,
  1490    svc: &k8s::Service,
  1491    port: u16,
  1492) {
  1493    let backend = backend.backend.as_ref().unwrap();
  1494    let dst = match backend.kind.as_ref().unwrap() {
  1495        grpc::outbound::backend::Kind::Balancer(balance) => {
  1496            let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap();
  1497            match kind {
  1498                grpc::outbound::backend::endpoint_discovery::Kind::Dst(dst) => &dst.path,
  1499            }
  1500        }
  1501        grpc::outbound::backend::Kind::Forward(_) => {
  1502            panic!("default route backend must be Balancer")
  1503        }
  1504    };
  1505    assert_eq!(
  1506        *dst,
  1507        format!(
  1508            "{}.{}.svc.{}:{}",
  1509            svc.name_unchecked(),
  1510            svc.namespace().unwrap(),
  1511            "cluster.local",
  1512            port
  1513        )
  1514    );
  1515
  1516    assert_svc_meta(&backend.metadata, svc, port)
  1517}
  1518
  1519#[track_caller]
  1520fn assert_singleton<T>(ts: &[T]) -> &T {
  1521    assert_eq!(ts.len(), 1);
  1522    ts.first().unwrap()
  1523}
  1524
  1525#[track_caller]
  1526fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) {
  1527    let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
  1528    match kind {
  1529        grpc::meta::metadata::Kind::Default(d) => {
  1530            panic!("route expected to not be default, but got default {d:?}")
  1531        }
  1532        grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name),
  1533    }
  1534}

View as plain text