1use std::{num::NonZeroU16, time::Duration};
2
3use futures::prelude::*;
4use k8s::policy::LocalTargetRef;
5use kube::ResourceExt;
6use linkerd_policy_controller_core::{Ipv4Net, Ipv6Net};
7use linkerd_policy_controller_k8s_api as k8s;
8use linkerd_policy_test::{
9 assert_default_all_unauthenticated_labels, assert_is_default_all_unauthenticated,
10 assert_protocol_detect_external, create, grpc, with_temp_ns,
11};
12use maplit::{btreemap, convert_args, hashmap};
13use tokio::time;
14
15#[tokio::test(flavor = "current_thread")]
16async fn external_workload_srv_with_authorization_policy() {
17 with_temp_ns(|client, ns| async move {
18 // Create an external workload object.
19 let ext_workload = create(&client, mk_external_workload(&ns, "wkld-1")).await;
20
21 tracing::trace!(
22 external_workload = %ext_workload.name_any(),
23 ip = ?ext_workload.spec.workload_ips.as_ref().unwrap()[0]
24 );
25
26 let mut rx = retry_watch_server(&client, &ns, &ext_workload.name_any()).await;
27 let config = rx
28 .next()
29 .await
30 .expect("watch must not fail")
31 .expect("watch must return an initial config");
32 tracing::trace!(?config);
33 assert_is_default_all_unauthenticated!(config);
34 assert_protocol_detect_external!(config);
35
36 // Create a server that selects the http port on the workload and ensure
37 // the update now uses this server (which has no authorizations)
38 let server = create(&client, mk_http_server(&ns, "external-http")).await;
39 let config = rx
40 .next()
41 .await
42 .expect("watch must not fail")
43 .expect("watch must return an initial config");
44 assert_eq!(
45 config.protocol,
46 Some(grpc::defaults::proxy_protocol_external())
47 );
48 assert_eq!(config.authorizations, vec![]);
49 assert_eq!(
50 config.labels,
51 convert_args!(hashmap!(
52 "group" => "policy.linkerd.io",
53 "kind" => "server",
54 "name" => "external-http"
55 )),
56 );
57
58 // Create a server authorization that refers to the `linkerd-admin`
59 // server (by name) and ensure that the update now reflects this
60 // authorization.
61 create(
62 &client,
63 k8s::policy::AuthorizationPolicy {
64 metadata: kube::api::ObjectMeta {
65 namespace: Some(ns.clone()),
66 name: Some("all-http".to_string()),
67 ..Default::default()
68 },
69 spec: k8s::policy::AuthorizationPolicySpec {
70 target_ref: LocalTargetRef {
71 group: Some("policy.linkerd.io".to_string()),
72 kind: "server".to_string(),
73 name: server.name_any(),
74 },
75 required_authentication_refs: vec![],
76 },
77 },
78 )
79 .await;
80 let config = rx
81 .next()
82 .await
83 .expect("watch must not fail")
84 .expect("watch must return an updated config");
85 tracing::trace!(?config);
86 assert_eq!(
87 config.protocol,
88 Some(grpc::defaults::proxy_protocol_external())
89 );
90 assert_eq!(
91 config.authorizations.first().unwrap().labels,
92 convert_args!(hashmap!(
93 "group" => "policy.linkerd.io",
94 "kind" => "authorizationpolicy",
95 "name" => "all-http",
96 )),
97 );
98 assert_eq!(
99 *config
100 .authorizations
101 .first()
102 .unwrap()
103 .authentication
104 .as_ref()
105 .unwrap(),
106 grpc::inbound::Authn {
107 permit: Some(grpc::inbound::authn::Permit::Unauthenticated(
108 grpc::inbound::authn::PermitUnauthenticated {}
109 )),
110 }
111 );
112 assert_eq!(
113 config.labels,
114 convert_args!(hashmap!(
115 "group" => "policy.linkerd.io",
116 "kind" => "server",
117 "name" => server.name_unchecked()
118 ))
119 );
120
121 // Delete the `Server` and ensure that the update reverts to the
122 // default.
123 kube::Api::<k8s::policy::Server>::namespaced(client.clone(), &ns)
124 .delete(
125 &server.name_unchecked(),
126 &kube::api::DeleteParams::default(),
127 )
128 .await
129 .expect("Server must be deleted");
130 let config = rx
131 .next()
132 .await
133 .expect("watch must not fail")
134 .expect("watch must return an updated config");
135
136 assert_is_default_all_unauthenticated!(config);
137 assert_protocol_detect_external!(config);
138 })
139 .await
140}
141
142#[tokio::test(flavor = "current_thread")]
143async fn external_workload_srv_with_http_route() {
144 with_temp_ns(|client, ns| async move {
145 // Create an external workload object.
146 let ext_workload = create(&client, mk_external_workload(&ns, "wkld-1")).await;
147
148 tracing::trace!(
149 external_workload = %ext_workload.name_any(),
150 ip = ?ext_workload.spec.workload_ips.as_ref().unwrap()[0]
151 );
152
153 let mut rx = retry_watch_server(&client, &ns, &ext_workload.name_any()).await;
154 let config = rx
155 .next()
156 .await
157 .expect("watch must not fail")
158 .expect("watch must return an initial config");
159 tracing::trace!(?config);
160 assert_is_default_all_unauthenticated!(config);
161 assert_protocol_detect_external!(config);
162
163 // Create a server that selects the http port on the workload and ensure
164 // the update now uses this server (which has no authorizations)
165 let server = create(&client, mk_http_server(&ns, "external-http")).await;
166 let config = rx
167 .next()
168 .await
169 .expect("watch must not fail")
170 .expect("watch must return an initial config");
171 assert_eq!(
172 config.protocol,
173 Some(grpc::defaults::proxy_protocol_external())
174 );
175 assert_eq!(config.authorizations, vec![]);
176 assert_eq!(
177 config.labels,
178 convert_args!(hashmap!(
179 "group" => "policy.linkerd.io",
180 "kind" => "server",
181 "name" => "external-http"
182 )),
183 );
184
185 let created_route = {
186 use k8s::policy::httproute as api;
187 let http_route = api::HttpRoute {
188 metadata: kube::api::ObjectMeta {
189 namespace: Some(ns.to_string()),
190 name: Some("http-route".to_string()),
191 ..Default::default()
192 },
193 spec: api::HttpRouteSpec {
194 inner: api::CommonRouteSpec {
195 parent_refs: Some(vec![api::ParentReference {
196 group: Some("policy.linkerd.io".to_string()),
197 kind: Some("Server".to_string()),
198 name: server.name_any(),
199 namespace: None,
200 section_name: None,
201 port: None,
202 }]),
203 },
204 hostnames: None,
205 rules: Some(vec![api::HttpRouteRule {
206 matches: Some(vec![api::HttpRouteMatch {
207 path: Some(api::HttpPathMatch::Exact {
208 value: "/endpoint".to_string(),
209 }),
210 headers: None,
211 query_params: None,
212 method: Some("GET".to_string()),
213 }]),
214 filters: None,
215 backend_refs: None,
216 timeouts: None,
217 }]),
218 },
219 status: None,
220 };
221
222 create(&client, http_route).await
223 };
224
225 let config = rx
226 .next()
227 .await
228 .expect("watch must not fail")
229 .expect("watch must return an initial config");
230 let kind = config
231 .protocol
232 .as_ref()
233 .expect("must have proxy protocol")
234 .kind
235 .as_ref()
236 .expect("must have kind");
237 let routes = if let grpc::inbound::proxy_protocol::Kind::Http1(ref http1) = kind {
238 &http1.routes[..]
239 } else {
240 panic!("proxy protocol must be 'Http1'; actually got:\n{kind:#?}");
241 };
242
243 assert_eq!(routes.len(), 1);
244 let route = routes.first().expect("must have route");
245 // Route should have no authz policy by default
246 assert_eq!(route.authorizations, vec![]);
247 assert_eq!(
248 route.metadata.to_owned().expect("route must have metadata"),
249 grpc::meta::Metadata {
250 kind: Some(grpc::meta::metadata::Kind::Resource(grpc::meta::Resource {
251 group: "policy.linkerd.io".to_string(),
252 kind: "HTTPRoute".to_string(),
253 name: "http-route".to_string(),
254 ..Default::default()
255 }))
256 }
257 );
258
259 // Route has path match
260 let rule_match = route
261 .rules
262 .first()
263 .expect("must have rule")
264 .matches
265 .first()
266 .expect("must have match");
267 assert_eq!(
268 rule_match
269 .path
270 .to_owned()
271 .expect("must have path match")
272 .kind
273 .expect("must have kind"),
274 grpc::http_route::path_match::Kind::Exact("/endpoint".to_string())
275 );
276
277 // Create a network authn and a policy that refers to the route
278 let all_networks = create(
279 &client,
280 k8s::policy::NetworkAuthentication {
281 metadata: kube::api::ObjectMeta {
282 namespace: Some(ns.clone()),
283 name: Some("all-net".to_string()),
284 ..Default::default()
285 },
286 spec: k8s::policy::NetworkAuthenticationSpec {
287 networks: vec![
288 k8s::policy::network_authentication::Network {
289 cidr: Ipv4Net::default().into(),
290 except: None,
291 },
292 k8s::policy::network_authentication::Network {
293 cidr: Ipv6Net::default().into(),
294 except: None,
295 },
296 ],
297 },
298 },
299 )
300 .await;
301 create(
302 &client,
303 k8s::policy::AuthorizationPolicy {
304 metadata: kube::api::ObjectMeta {
305 namespace: Some(ns.clone()),
306 name: Some("all-net".to_string()),
307 ..Default::default()
308 },
309 spec: k8s::policy::AuthorizationPolicySpec {
310 target_ref: k8s::policy::LocalTargetRef::from_resource(&created_route),
311 required_authentication_refs: vec![
312 k8s::policy::NamespacedTargetRef::from_resource(&all_networks),
313 ],
314 },
315 },
316 )
317 .await;
318
319 let config = rx
320 .next()
321 .await
322 .expect("watch must not fail")
323 .expect("watch must return an initial config");
324 let http1 = if let grpc::inbound::proxy_protocol::Kind::Http1(http1) = config
325 .protocol
326 .expect("must have proxy protocol")
327 .kind
328 .expect("must have kind")
329 {
330 http1
331 } else {
332 panic!("proxy protocol must be HTTP1");
333 };
334 let h1_route = http1.routes.first().expect("must have route");
335 assert_eq!(h1_route.authorizations.len(), 1, "must have authorizations");
336
337 // Delete the `HttpRoute` and ensure that the update reverts to the
338 // default.
339 kube::Api::<k8s::policy::HttpRoute>::namespaced(client.clone(), &ns)
340 .delete("http-route", &kube::api::DeleteParams::default())
341 .await
342 .expect("HttpRoute must be deleted");
343 let config = rx
344 .next()
345 .await
346 .expect("watch must not fail")
347 .expect("watch must return an initial config");
348 assert_eq!(
349 config.protocol,
350 Some(grpc::defaults::proxy_protocol_external())
351 );
352 })
353 .await;
354}
355#[tokio::test(flavor = "current_thread")]
356async fn external_workload_default_http_route() {
357 with_temp_ns(|client, ns| async move {
358 // Create an external workload object.
359 let ext_workload = create(&client, mk_external_workload(&ns, "wkld-1")).await;
360
361 tracing::trace!(
362 external_workload = %ext_workload.name_any(),
363 ip = ?ext_workload.spec.workload_ips.as_ref().unwrap()[0]
364 );
365
366 let mut rx = retry_watch_server(&client, &ns, &ext_workload.name_any()).await;
367 let config = rx
368 .next()
369 .await
370 .expect("watch must not fail")
371 .expect("watch must return an initial config");
372 tracing::trace!(?config);
373 assert_is_default_all_unauthenticated!(config);
374 assert_protocol_detect_external!(config);
375
376 let kind = config
377 .protocol
378 .as_ref()
379 .expect("must have proxy protocol")
380 .kind
381 .as_ref()
382 .expect("must have kind");
383 let routes = if let grpc::inbound::proxy_protocol::Kind::Detect(ref detect) = kind {
384 &detect.http_routes[..]
385 } else {
386 panic!("proxy protocol must be 'Detect'; actually got:\n{kind:#?}");
387 };
388
389 assert_eq!(routes.len(), 1);
390 let route_authzs = &routes[0].authorizations;
391 assert_eq!(route_authzs.len(), 0);
392 })
393 .await
394}
395
396fn mk_external_workload(ns: &str, name: &str) -> k8s::external_workload::ExternalWorkload {
397 k8s::external_workload::ExternalWorkload {
398 metadata: k8s::ObjectMeta {
399 namespace: Some(ns.into()),
400 name: Some(name.into()),
401 labels: Some(convert_args!(btreemap!(
402 "app" => "ext",
403 ))),
404 ..Default::default()
405 },
406 spec: k8s::external_workload::ExternalWorkloadSpec {
407 mesh_tls: k8s::external_workload::MeshTls {
408 identity: "some-identity".to_string(),
409 server_name: "some-sni".to_string(),
410 },
411 ports: Some(vec![k8s::external_workload::PortSpec {
412 name: Some("http".into()),
413 port: NonZeroU16::new(80).unwrap(),
414 protocol: Default::default(),
415 }]),
416 workload_ips: Some(vec![k8s::external_workload::WorkloadIP {
417 ip: "192.0.2.0".to_string(),
418 }]),
419 },
420 status: None,
421 }
422}
423
424async fn retry_watch_server(
425 client: &kube::Client,
426 ns: &str,
427 workload_name: &str,
428) -> tonic::Streaming<grpc::inbound::Server> {
429 let mut policy_api = grpc::InboundPolicyClient::port_forwarded(client).await;
430 loop {
431 match policy_api
432 .watch_port_for_external_workload(ns, workload_name, 80)
433 .await
434 {
435 Ok(rx) => return rx,
436 Err(error) => {
437 tracing::error!(
438 ?error,
439 ns,
440 workload_name,
441 "failed to watch policy for port 80"
442 );
443 time::sleep(Duration::from_secs(1)).await;
444 }
445 }
446 }
447}
448
449fn mk_http_server(ns: &str, name: &str) -> k8s::policy::Server {
450 k8s::policy::Server {
451 metadata: k8s::ObjectMeta {
452 namespace: Some(ns.to_string()),
453 name: Some(name.to_string()),
454 ..Default::default()
455 },
456 spec: k8s::policy::ServerSpec {
457 selector: k8s::policy::server::Selector::ExternalWorkload(
458 k8s::labels::Selector::default(),
459 ),
460 port: k8s::policy::server::Port::Name("http".to_string()),
461 proxy_protocol: Some(k8s::policy::server::ProxyProtocol::Http1),
462 },
463 }
464}
View as plain text