...

Text file src/github.com/linkerd/linkerd2/policy-test/tests/inbound_api_external_workload.rs

Documentation: github.com/linkerd/linkerd2/policy-test/tests

     1use std::{num::NonZeroU16, time::Duration};
     2
     3use futures::prelude::*;
     4use k8s::policy::LocalTargetRef;
     5use kube::ResourceExt;
     6use linkerd_policy_controller_core::{Ipv4Net, Ipv6Net};
     7use linkerd_policy_controller_k8s_api as k8s;
     8use linkerd_policy_test::{
     9    assert_default_all_unauthenticated_labels, assert_is_default_all_unauthenticated,
    10    assert_protocol_detect_external, create, grpc, with_temp_ns,
    11};
    12use maplit::{btreemap, convert_args, hashmap};
    13use tokio::time;
    14
    15#[tokio::test(flavor = "current_thread")]
    16async fn external_workload_srv_with_authorization_policy() {
    17    with_temp_ns(|client, ns| async move {
    18        // Create an external workload object.
    19        let ext_workload = create(&client, mk_external_workload(&ns, "wkld-1")).await;
    20
    21        tracing::trace!(
    22            external_workload = %ext_workload.name_any(),
    23            ip = ?ext_workload.spec.workload_ips.as_ref().unwrap()[0]
    24        );
    25
    26        let mut rx = retry_watch_server(&client, &ns, &ext_workload.name_any()).await;
    27        let config = rx
    28            .next()
    29            .await
    30            .expect("watch must not fail")
    31            .expect("watch must return an initial config");
    32        tracing::trace!(?config);
    33        assert_is_default_all_unauthenticated!(config);
    34        assert_protocol_detect_external!(config);
    35
    36        // Create a server that selects the http port on the workload and ensure
    37        // the update now uses this server (which has no authorizations)
    38        let server = create(&client, mk_http_server(&ns, "external-http")).await;
    39        let config = rx
    40            .next()
    41            .await
    42            .expect("watch must not fail")
    43            .expect("watch must return an initial config");
    44        assert_eq!(
    45            config.protocol,
    46            Some(grpc::defaults::proxy_protocol_external())
    47        );
    48        assert_eq!(config.authorizations, vec![]);
    49        assert_eq!(
    50            config.labels,
    51            convert_args!(hashmap!(
    52                "group" => "policy.linkerd.io",
    53                "kind" => "server",
    54                "name" => "external-http"
    55            )),
    56        );
    57
    58        // Create a server authorization that refers to the `linkerd-admin`
    59        // server (by name) and ensure that the update now reflects this
    60        // authorization.
    61        create(
    62            &client,
    63            k8s::policy::AuthorizationPolicy {
    64                metadata: kube::api::ObjectMeta {
    65                    namespace: Some(ns.clone()),
    66                    name: Some("all-http".to_string()),
    67                    ..Default::default()
    68                },
    69                spec: k8s::policy::AuthorizationPolicySpec {
    70                    target_ref: LocalTargetRef {
    71                        group: Some("policy.linkerd.io".to_string()),
    72                        kind: "server".to_string(),
    73                        name: server.name_any(),
    74                    },
    75                    required_authentication_refs: vec![],
    76                },
    77            },
    78        )
    79        .await;
    80        let config = rx
    81            .next()
    82            .await
    83            .expect("watch must not fail")
    84            .expect("watch must return an updated config");
    85        tracing::trace!(?config);
    86        assert_eq!(
    87            config.protocol,
    88            Some(grpc::defaults::proxy_protocol_external())
    89        );
    90        assert_eq!(
    91            config.authorizations.first().unwrap().labels,
    92            convert_args!(hashmap!(
    93                "group" => "policy.linkerd.io",
    94                "kind" => "authorizationpolicy",
    95                "name" => "all-http",
    96            )),
    97        );
    98        assert_eq!(
    99            *config
   100                .authorizations
   101                .first()
   102                .unwrap()
   103                .authentication
   104                .as_ref()
   105                .unwrap(),
   106            grpc::inbound::Authn {
   107                permit: Some(grpc::inbound::authn::Permit::Unauthenticated(
   108                    grpc::inbound::authn::PermitUnauthenticated {}
   109                )),
   110            }
   111        );
   112        assert_eq!(
   113            config.labels,
   114            convert_args!(hashmap!(
   115                "group" => "policy.linkerd.io",
   116                "kind" => "server",
   117                "name" => server.name_unchecked()
   118            ))
   119        );
   120
   121        // Delete the `Server` and ensure that the update reverts to the
   122        // default.
   123        kube::Api::<k8s::policy::Server>::namespaced(client.clone(), &ns)
   124            .delete(
   125                &server.name_unchecked(),
   126                &kube::api::DeleteParams::default(),
   127            )
   128            .await
   129            .expect("Server must be deleted");
   130        let config = rx
   131            .next()
   132            .await
   133            .expect("watch must not fail")
   134            .expect("watch must return an updated config");
   135
   136        assert_is_default_all_unauthenticated!(config);
   137        assert_protocol_detect_external!(config);
   138    })
   139    .await
   140}
   141
   142#[tokio::test(flavor = "current_thread")]
   143async fn external_workload_srv_with_http_route() {
   144    with_temp_ns(|client, ns| async move {
   145        // Create an external workload object.
   146        let ext_workload = create(&client, mk_external_workload(&ns, "wkld-1")).await;
   147
   148        tracing::trace!(
   149            external_workload = %ext_workload.name_any(),
   150            ip = ?ext_workload.spec.workload_ips.as_ref().unwrap()[0]
   151        );
   152
   153        let mut rx = retry_watch_server(&client, &ns, &ext_workload.name_any()).await;
   154        let config = rx
   155            .next()
   156            .await
   157            .expect("watch must not fail")
   158            .expect("watch must return an initial config");
   159        tracing::trace!(?config);
   160        assert_is_default_all_unauthenticated!(config);
   161        assert_protocol_detect_external!(config);
   162
   163        // Create a server that selects the http port on the workload and ensure
   164        // the update now uses this server (which has no authorizations)
   165        let server = create(&client, mk_http_server(&ns, "external-http")).await;
   166        let config = rx
   167            .next()
   168            .await
   169            .expect("watch must not fail")
   170            .expect("watch must return an initial config");
   171        assert_eq!(
   172            config.protocol,
   173            Some(grpc::defaults::proxy_protocol_external())
   174        );
   175        assert_eq!(config.authorizations, vec![]);
   176        assert_eq!(
   177            config.labels,
   178            convert_args!(hashmap!(
   179                "group" => "policy.linkerd.io",
   180                "kind" => "server",
   181                "name" => "external-http"
   182            )),
   183        );
   184
   185        let created_route = {
   186            use k8s::policy::httproute as api;
   187            let http_route = api::HttpRoute {
   188                metadata: kube::api::ObjectMeta {
   189                    namespace: Some(ns.to_string()),
   190                    name: Some("http-route".to_string()),
   191                    ..Default::default()
   192                },
   193                spec: api::HttpRouteSpec {
   194                    inner: api::CommonRouteSpec {
   195                        parent_refs: Some(vec![api::ParentReference {
   196                            group: Some("policy.linkerd.io".to_string()),
   197                            kind: Some("Server".to_string()),
   198                            name: server.name_any(),
   199                            namespace: None,
   200                            section_name: None,
   201                            port: None,
   202                        }]),
   203                    },
   204                    hostnames: None,
   205                    rules: Some(vec![api::HttpRouteRule {
   206                        matches: Some(vec![api::HttpRouteMatch {
   207                            path: Some(api::HttpPathMatch::Exact {
   208                                value: "/endpoint".to_string(),
   209                            }),
   210                            headers: None,
   211                            query_params: None,
   212                            method: Some("GET".to_string()),
   213                        }]),
   214                        filters: None,
   215                        backend_refs: None,
   216                        timeouts: None,
   217                    }]),
   218                },
   219                status: None,
   220            };
   221
   222            create(&client, http_route).await
   223        };
   224
   225        let config = rx
   226            .next()
   227            .await
   228            .expect("watch must not fail")
   229            .expect("watch must return an initial config");
   230        let kind = config
   231            .protocol
   232            .as_ref()
   233            .expect("must have proxy protocol")
   234            .kind
   235            .as_ref()
   236            .expect("must have kind");
   237        let routes = if let grpc::inbound::proxy_protocol::Kind::Http1(ref http1) = kind {
   238            &http1.routes[..]
   239        } else {
   240            panic!("proxy protocol must be 'Http1'; actually got:\n{kind:#?}");
   241        };
   242
   243        assert_eq!(routes.len(), 1);
   244        let route = routes.first().expect("must have route");
   245        // Route should have no authz policy by default
   246        assert_eq!(route.authorizations, vec![]);
   247        assert_eq!(
   248            route.metadata.to_owned().expect("route must have metadata"),
   249            grpc::meta::Metadata {
   250                kind: Some(grpc::meta::metadata::Kind::Resource(grpc::meta::Resource {
   251                    group: "policy.linkerd.io".to_string(),
   252                    kind: "HTTPRoute".to_string(),
   253                    name: "http-route".to_string(),
   254                    ..Default::default()
   255                }))
   256            }
   257        );
   258
   259        // Route has path match
   260        let rule_match = route
   261            .rules
   262            .first()
   263            .expect("must have rule")
   264            .matches
   265            .first()
   266            .expect("must have match");
   267        assert_eq!(
   268            rule_match
   269                .path
   270                .to_owned()
   271                .expect("must have path match")
   272                .kind
   273                .expect("must have kind"),
   274            grpc::http_route::path_match::Kind::Exact("/endpoint".to_string())
   275        );
   276
   277        // Create a network authn and a policy that refers to the route
   278        let all_networks = create(
   279            &client,
   280            k8s::policy::NetworkAuthentication {
   281                metadata: kube::api::ObjectMeta {
   282                    namespace: Some(ns.clone()),
   283                    name: Some("all-net".to_string()),
   284                    ..Default::default()
   285                },
   286                spec: k8s::policy::NetworkAuthenticationSpec {
   287                    networks: vec![
   288                        k8s::policy::network_authentication::Network {
   289                            cidr: Ipv4Net::default().into(),
   290                            except: None,
   291                        },
   292                        k8s::policy::network_authentication::Network {
   293                            cidr: Ipv6Net::default().into(),
   294                            except: None,
   295                        },
   296                    ],
   297                },
   298            },
   299        )
   300        .await;
   301        create(
   302            &client,
   303            k8s::policy::AuthorizationPolicy {
   304                metadata: kube::api::ObjectMeta {
   305                    namespace: Some(ns.clone()),
   306                    name: Some("all-net".to_string()),
   307                    ..Default::default()
   308                },
   309                spec: k8s::policy::AuthorizationPolicySpec {
   310                    target_ref: k8s::policy::LocalTargetRef::from_resource(&created_route),
   311                    required_authentication_refs: vec![
   312                        k8s::policy::NamespacedTargetRef::from_resource(&all_networks),
   313                    ],
   314                },
   315            },
   316        )
   317        .await;
   318
   319        let config = rx
   320            .next()
   321            .await
   322            .expect("watch must not fail")
   323            .expect("watch must return an initial config");
   324        let http1 = if let grpc::inbound::proxy_protocol::Kind::Http1(http1) = config
   325            .protocol
   326            .expect("must have proxy protocol")
   327            .kind
   328            .expect("must have kind")
   329        {
   330            http1
   331        } else {
   332            panic!("proxy protocol must be HTTP1");
   333        };
   334        let h1_route = http1.routes.first().expect("must have route");
   335        assert_eq!(h1_route.authorizations.len(), 1, "must have authorizations");
   336
   337        // Delete the `HttpRoute` and ensure that the update reverts to the
   338        // default.
   339        kube::Api::<k8s::policy::HttpRoute>::namespaced(client.clone(), &ns)
   340            .delete("http-route", &kube::api::DeleteParams::default())
   341            .await
   342            .expect("HttpRoute must be deleted");
   343        let config = rx
   344            .next()
   345            .await
   346            .expect("watch must not fail")
   347            .expect("watch must return an initial config");
   348        assert_eq!(
   349            config.protocol,
   350            Some(grpc::defaults::proxy_protocol_external())
   351        );
   352    })
   353    .await;
   354}
   355#[tokio::test(flavor = "current_thread")]
   356async fn external_workload_default_http_route() {
   357    with_temp_ns(|client, ns| async move {
   358        // Create an external workload object.
   359        let ext_workload = create(&client, mk_external_workload(&ns, "wkld-1")).await;
   360
   361        tracing::trace!(
   362            external_workload = %ext_workload.name_any(),
   363            ip = ?ext_workload.spec.workload_ips.as_ref().unwrap()[0]
   364        );
   365
   366        let mut rx = retry_watch_server(&client, &ns, &ext_workload.name_any()).await;
   367        let config = rx
   368            .next()
   369            .await
   370            .expect("watch must not fail")
   371            .expect("watch must return an initial config");
   372        tracing::trace!(?config);
   373        assert_is_default_all_unauthenticated!(config);
   374        assert_protocol_detect_external!(config);
   375
   376        let kind = config
   377            .protocol
   378            .as_ref()
   379            .expect("must have proxy protocol")
   380            .kind
   381            .as_ref()
   382            .expect("must have kind");
   383        let routes = if let grpc::inbound::proxy_protocol::Kind::Detect(ref detect) = kind {
   384            &detect.http_routes[..]
   385        } else {
   386            panic!("proxy protocol must be 'Detect'; actually got:\n{kind:#?}");
   387        };
   388
   389        assert_eq!(routes.len(), 1);
   390        let route_authzs = &routes[0].authorizations;
   391        assert_eq!(route_authzs.len(), 0);
   392    })
   393    .await
   394}
   395
   396fn mk_external_workload(ns: &str, name: &str) -> k8s::external_workload::ExternalWorkload {
   397    k8s::external_workload::ExternalWorkload {
   398        metadata: k8s::ObjectMeta {
   399            namespace: Some(ns.into()),
   400            name: Some(name.into()),
   401            labels: Some(convert_args!(btreemap!(
   402                        "app" => "ext",
   403            ))),
   404            ..Default::default()
   405        },
   406        spec: k8s::external_workload::ExternalWorkloadSpec {
   407            mesh_tls: k8s::external_workload::MeshTls {
   408                identity: "some-identity".to_string(),
   409                server_name: "some-sni".to_string(),
   410            },
   411            ports: Some(vec![k8s::external_workload::PortSpec {
   412                name: Some("http".into()),
   413                port: NonZeroU16::new(80).unwrap(),
   414                protocol: Default::default(),
   415            }]),
   416            workload_ips: Some(vec![k8s::external_workload::WorkloadIP {
   417                ip: "192.0.2.0".to_string(),
   418            }]),
   419        },
   420        status: None,
   421    }
   422}
   423
   424async fn retry_watch_server(
   425    client: &kube::Client,
   426    ns: &str,
   427    workload_name: &str,
   428) -> tonic::Streaming<grpc::inbound::Server> {
   429    let mut policy_api = grpc::InboundPolicyClient::port_forwarded(client).await;
   430    loop {
   431        match policy_api
   432            .watch_port_for_external_workload(ns, workload_name, 80)
   433            .await
   434        {
   435            Ok(rx) => return rx,
   436            Err(error) => {
   437                tracing::error!(
   438                    ?error,
   439                    ns,
   440                    workload_name,
   441                    "failed to watch policy for port 80"
   442                );
   443                time::sleep(Duration::from_secs(1)).await;
   444            }
   445        }
   446    }
   447}
   448
   449fn mk_http_server(ns: &str, name: &str) -> k8s::policy::Server {
   450    k8s::policy::Server {
   451        metadata: k8s::ObjectMeta {
   452            namespace: Some(ns.to_string()),
   453            name: Some(name.to_string()),
   454            ..Default::default()
   455        },
   456        spec: k8s::policy::ServerSpec {
   457            selector: k8s::policy::server::Selector::ExternalWorkload(
   458                k8s::labels::Selector::default(),
   459            ),
   460            port: k8s::policy::server::Port::Name("http".to_string()),
   461            proxy_protocol: Some(k8s::policy::server::ProxyProtocol::Http1),
   462        },
   463    }
   464}

View as plain text