1from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Optional
2
3from ..config import Config
4from ..constants import Constants
5from .irbasemapping import IRBaseMapping
6from .irbuffer import IRBuffer
7from .ircors import IRCORS
8from .irfilter import IRFilter
9from .irgzip import IRGzip
10from .irhttpmapping import IRHTTPMapping
11from .iripallowdeny import IRIPAllowDeny
12from .irresource import IRResource
13from .irretrypolicy import IRRetryPolicy
14
15if TYPE_CHECKING:
16 from .ir import IR # pragma: no cover
17
18
19class IRAmbassador(IRResource):
20
21 # All the AModTransparentKeys are copied from the incoming Ambassador resource
22 # into the IRAmbassador object partway through IRAmbassador.finalize().
23 #
24 # PLEASE KEEP THIS LIST SORTED.
25
26 AModTransparentKeys: ClassVar = [
27 "add_linkerd_headers",
28 "admin_port",
29 "auth_enabled",
30 "allow_chunked_length",
31 "buffer_limit_bytes",
32 "circuit_breakers",
33 "cluster_idle_timeout_ms",
34 "cluster_max_connection_lifetime_ms",
35 "cluster_request_timeout_ms",
36 "debug_mode",
37 # Do not include defaults, that's handled manually in setup.
38 "default_label_domain",
39 "default_labels",
40 "diagnostics",
41 "enable_http10",
42 "enable_ipv4",
43 "enable_ipv6",
44 "envoy_log_format",
45 "envoy_log_path",
46 "envoy_log_type",
47 "forward_client_cert_details",
48 # Do not include envoy_validation_timeout; we let finalize() type-check it.
49 # Do not include ip_allow or ip_deny; we let finalize() type-check them.
50 "headers_with_underscores_action",
51 "keepalive",
52 "listener_idle_timeout_ms",
53 "liveness_probe",
54 "load_balancer",
55 "max_request_headers_kb",
56 "merge_slashes",
57 "reject_requests_with_escaped_slashes",
58 "preserve_external_request_id",
59 "proper_case",
60 "prune_unreachable_routes",
61 "readiness_probe",
62 "regex_max_size",
63 "regex_type",
64 "resolver",
65 "error_response_overrides",
66 "header_case_overrides",
67 "server_name",
68 "service_port",
69 "set_current_client_cert_details",
70 "statsd",
71 "strip_matching_host_port",
72 "suppress_envoy_headers",
73 "use_ambassador_namespace_for_service_resolution",
74 "use_proxy_proto",
75 "use_remote_address",
76 "x_forwarded_proto_redirect",
77 "xff_num_trusted_hops",
78 ]
79
80 service_port: int
81 default_label_domain: str
82
83 # Set up the default probes and such.
84 default_liveness_probe: ClassVar[Dict[str, str]] = {
85 "prefix": "/ambassador/v0/check_alive",
86 "rewrite": "/ambassador/v0/check_alive",
87 }
88
89 default_readiness_probe: ClassVar[Dict[str, str]] = {
90 "prefix": "/ambassador/v0/check_ready",
91 "rewrite": "/ambassador/v0/check_ready",
92 }
93
94 default_diagnostics: ClassVar[Dict[str, str]] = {
95 "prefix": "/ambassador/v0/",
96 "rewrite": "/ambassador/v0/",
97 }
98
99 # Set up the default Envoy validation timeout. This is deliberately chosen to be very large
100 # because the consequences of this timeout tripping are very bad. Ambassador basically ceases
101 # to function. It is far better to slow down as our configurations grow and give users a
102 # leading indicator that there is a scaling issue that needs to be dealt with than to
103 # suddenly and mysteriously stop functioning the day their configuration happens to become
104 # large enough to exceed this threshold.
105 default_validation_timeout: ClassVar[int] = 60
106
107 def __init__(
108 self,
109 ir: "IR",
110 aconf: Config,
111 rkey: str = "ir.ambassador",
112 kind: str = "IRAmbassador",
113 name: str = "ir.ambassador",
114 use_remote_address: bool = True,
115 **kwargs,
116 ) -> None:
117 # print("IRAmbassador __init__ (%s %s %s)" % (kind, name, kwargs))
118
119 super().__init__(
120 ir=ir,
121 aconf=aconf,
122 rkey=rkey,
123 kind=kind,
124 name=name,
125 service_port=Constants.SERVICE_PORT_HTTP,
126 admin_port=Constants.ADMIN_PORT,
127 auth_enabled=None,
128 enable_ipv6=False,
129 envoy_log_type="text",
130 envoy_log_path="/dev/fd/1",
131 envoy_log_format=None,
132 envoy_validation_timeout=IRAmbassador.default_validation_timeout,
133 enable_ipv4=True,
134 listener_idle_timeout_ms=None,
135 liveness_probe={"enabled": True},
136 readiness_probe={"enabled": True},
137 diagnostics={"enabled": True}, # TODO(lukeshu): In getambassador.io/v3alpha2, change
138 # the default to {"enabled": False}. See the related
139 # comment in crd_module.go.
140 use_proxy_proto=False,
141 enable_http10=False,
142 proper_case=False,
143 prune_unreachable_routes=True, # default True; can be updated in finalize()
144 use_remote_address=use_remote_address,
145 x_forwarded_proto_redirect=False,
146 load_balancer=None,
147 circuit_breakers=None,
148 xff_num_trusted_hops=0,
149 use_ambassador_namespace_for_service_resolution=False,
150 server_name="envoy",
151 debug_mode=False,
152 preserve_external_request_id=False,
153 max_request_headers_kb=None,
154 **kwargs,
155 )
156
157 self.ip_allow_deny: Optional[IRIPAllowDeny] = None
158 self._finalized = False
159
160 def setup(self, ir: "IR", aconf: Config) -> bool:
161 # The heavy lifting here is mostly in the finalize() method, so that when we do fallback
162 # lookups for TLS configuration stuff, the defaults are present in the Ambassador module.
163 #
164 # Of course, that means that we have to copy the defaults in here.
165
166 # We're interested in the 'ambassador' module from the Config, if any...
167 amod = aconf.get_module("ambassador")
168
169 if amod and "defaults" in amod:
170 self["defaults"] = amod["defaults"]
171
172 return True
173
174 def finalize(self, ir: "IR", aconf: Config) -> bool:
175 self._finalized = True
176
177 # Check TLSContext resources to see if we should enable TLS termination.
178 to_delete = []
179
180 for ctx_name, ctx in ir.tls_contexts.items():
181 if not ctx.resolve():
182 # Welllll this ain't good.
183 ctx.set_active(False)
184 to_delete.append(ctx_name)
185 elif ctx.get("hosts", None):
186 # This is a termination context
187 self.logger.debug(
188 "TLSContext %s is a termination context, enabling TLS termination" % ctx.name
189 )
190 self.service_port = Constants.SERVICE_PORT_HTTPS
191
192 if ctx.get("ca_cert", None):
193 # Client-side TLS is enabled.
194 self.logger.debug("TLSContext %s enables client certs!" % ctx.name)
195
196 for ctx_name in to_delete:
197 del ir.tls_contexts[ctx_name]
198
199 # After that, walk the AModTransparentKeys and copy all those things from the
200 # input into our IRAmbassador.
201 #
202 # Some of these will get overridden later, and some things not in AModTransparentKeys
203 # get handled manually below.
204 amod = aconf.get_module("ambassador")
205
206 if amod:
207 for key in IRAmbassador.AModTransparentKeys:
208 if key in amod:
209 # Override the default here.
210 self[key] = amod[key]
211
212 # If we have an envoy_validation_timeout...
213 if "envoy_validation_timeout" in amod:
214 # ...then set our timeout from it.
215 try:
216 self.envoy_validation_timeout = int(amod["envoy_validation_timeout"])
217 except ValueError:
218 self.post_error("envoy_validation_timeout must be an integer number of seconds")
219
220 # If we don't have a default label domain, force it to 'ambassador'.
221 if not self.get("default_label_domain"):
222 self.default_label_domain = "ambassador"
223
224 # Likewise, if we have no default labels, force an empty dict (it makes life easier
225 # on other modules).
226 if not self.get("default_labels"):
227 self.default_labels: Dict[str, Any] = {}
228
229 # Next up: diag port & services.
230 diag_service = "127.0.0.1:%d" % Constants.DIAG_PORT
231
232 for name, cur, dflt in [
233 ("liveness", self.liveness_probe, IRAmbassador.default_liveness_probe),
234 ("readiness", self.readiness_probe, IRAmbassador.default_readiness_probe),
235 ("diagnostics", self.diagnostics, IRAmbassador.default_diagnostics),
236 ]:
237 if cur and cur.get("enabled", False):
238 if not cur.get("prefix", None):
239 cur["prefix"] = dflt["prefix"]
240
241 if not cur.get("rewrite", None):
242 cur["rewrite"] = dflt["rewrite"]
243
244 if not cur.get("service", None):
245 cur["service"] = diag_service
246
247 if amod and ("enable_grpc_http11_bridge" in amod):
248 self.grpc_http11_bridge = IRFilter(
249 ir=ir,
250 aconf=aconf,
251 kind="ir.grpc_http1_bridge",
252 name="grpc_http1_bridge",
253 config=dict(),
254 )
255 self.grpc_http11_bridge.sourced_by(amod)
256 ir.save_filter(self.grpc_http11_bridge)
257
258 if amod and ("enable_grpc_web" in amod):
259 self.grpc_web = IRFilter(
260 ir=ir, aconf=aconf, kind="ir.grpc_web", name="grpc_web", config=dict()
261 )
262 self.grpc_web.sourced_by(amod)
263 ir.save_filter(self.grpc_web)
264
265 if amod and (grpc_stats := amod.get("grpc_stats")) is not None:
266 # grpc_stats = { 'all_methods': False} if amod.grpc_stats is None else amod.grpc_stats
267 # default config with safe values
268 config: Dict[str, Any] = {"enable_upstream_stats": False}
269
270 # Only one of config['individual_method_stats_allowlist'] or
271 # config['stats_for_all_methods'] can be set.
272 if "services" in grpc_stats:
273 config["individual_method_stats_allowlist"] = {"services": grpc_stats["services"]}
274 else:
275 config["stats_for_all_methods"] = bool(grpc_stats.get("all_methods", False))
276
277 if "upstream_stats" in grpc_stats:
278 config["enable_upstream_stats"] = bool(grpc_stats["upstream_stats"])
279
280 self.grpc_stats = IRFilter(
281 ir=ir, aconf=aconf, kind="ir.grpc_stats", name="grpc_stats", config=config
282 )
283 self.grpc_stats.sourced_by(amod)
284 ir.save_filter(self.grpc_stats)
285
286 if amod and ("lua_scripts" in amod):
287 self.lua_scripts = IRFilter(
288 ir=ir,
289 aconf=aconf,
290 kind="ir.lua_scripts",
291 name="lua_scripts",
292 config={"inline_code": amod.lua_scripts},
293 )
294 self.lua_scripts.sourced_by(amod)
295 ir.save_filter(self.lua_scripts)
296
297 # Gzip.
298 if amod and ("gzip" in amod):
299 self.gzip = IRGzip(ir=ir, aconf=aconf, location=self.location, **amod.gzip)
300
301 if self.gzip:
302 ir.save_filter(self.gzip)
303 else:
304 return False
305
306 # Buffer.
307 if amod and ("buffer" in amod):
308 self.buffer = IRBuffer(ir=ir, aconf=aconf, location=self.location, **amod.buffer)
309
310 if self.buffer:
311 ir.save_filter(self.buffer)
312 else:
313 return False
314
315 if amod and ("keepalive" in amod):
316 self.keepalive = amod["keepalive"]
317
318 # Finally, default CORS stuff.
319 if amod and ("cors" in amod):
320 self.cors = IRCORS(ir=ir, aconf=aconf, location=self.location, **amod.cors)
321
322 if self.cors:
323 self.cors.referenced_by(self)
324 else:
325 return False
326
327 if amod and ("retry_policy" in amod):
328 self.retry_policy = IRRetryPolicy(
329 ir=ir, aconf=aconf, location=self.location, **amod.retry_policy
330 )
331
332 if self.retry_policy:
333 self.retry_policy.referenced_by(self)
334 else:
335 return False
336
337 if amod:
338 if "ip_allow" in amod:
339 self.handle_ip_allow_deny(allow=True, principals=amod.ip_allow)
340
341 if "ip_deny" in amod:
342 self.handle_ip_allow_deny(allow=False, principals=amod.ip_deny)
343
344 if self.ip_allow_deny is not None:
345 ir.save_filter(self.ip_allow_deny)
346
347 # Clear this so it doesn't get duplicated when we dump the
348 # Ambassador module.
349 self.ip_allow_deny = None
350
351 if self.get("load_balancer", None) is not None:
352 if not IRHTTPMapping.validate_load_balancer(self["load_balancer"]):
353 self.post_error("Invalid load_balancer specified: {}".format(self["load_balancer"]))
354 return False
355
356 if self.get("circuit_breakers", None) is not None:
357 if not IRBaseMapping.validate_circuit_breakers(self.ir, self["circuit_breakers"]):
358 self.post_error(
359 "Invalid circuit_breakers specified: {}".format(self["circuit_breakers"])
360 )
361 return False
362
363 if self.get("envoy_log_type") == "text":
364 if self.get("envoy_log_format", None) is not None and not isinstance(
365 self.get("envoy_log_format"), str
366 ):
367 self.post_error(
368 "envoy_log_type 'text' requires a string in envoy_log_format: {}, invalidating...".format(
369 self.get("envoy_log_format")
370 )
371 )
372 self["envoy_log_format"] = ""
373 return False
374 elif self.get("envoy_log_type") == "json":
375 if self.get("envoy_log_format", None) is not None and not isinstance(
376 self.get("envoy_log_format"), dict
377 ):
378 self.post_error(
379 "envoy_log_type 'json' requires a dictionary in envoy_log_format: {}, invalidating...".format(
380 self.get("envoy_log_format")
381 )
382 )
383 self["envoy_log_format"] = {}
384 return False
385 else:
386 self.post_error(
387 "Invalid log_type specified: {}. Supported: json, text".format(
388 self.get("envoy_log_type")
389 )
390 )
391 return False
392
393 if self.get("forward_client_cert_details") is not None:
394 # https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto#envoy-v3-api-enum-extensions-filters-network-http-connection-manager-v3-httpconnectionmanager-forwardclientcertdetails
395 valid_values = (
396 "SANITIZE",
397 "FORWARD_ONLY",
398 "APPEND_FORWARD",
399 "SANITIZE_SET",
400 "ALWAYS_FORWARD_ONLY",
401 )
402
403 value = self.get("forward_client_cert_details")
404 if value not in valid_values:
405 self.post_error(
406 "'forward_client_cert_details' may not be set to '{}'; it may only be set to one of: {}".format(
407 value, ", ".join(valid_values)
408 )
409 )
410 return False
411
412 cert_details = self.get("set_current_client_cert_details")
413 if cert_details:
414 # https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto#envoy-v3-api-msg-extensions-filters-network-http-connection-manager-v3-httpconnectionmanager-setcurrentclientcertdetails
415 valid_keys = ("subject", "cert", "chain", "dns", "uri")
416
417 for k, v in cert_details.items():
418 if k not in valid_keys:
419 self.post_error(
420 "'set_current_client_cert_details' may not contain key '{}'; it may only contain keys: {}".format(
421 k, ", ".join(valid_keys)
422 )
423 )
424 return False
425
426 if v not in (True, False):
427 self.post_error(
428 "'set_current_client_cert_details' value for key '{}' may only be 'true' or 'false', not '{}'".format(
429 k, v
430 )
431 )
432 return False
433
434 return True
435
436 def add_mappings(self, ir: "IR", aconf: Config):
437 for name, cur in [
438 ("liveness", self.liveness_probe),
439 ("readiness", self.readiness_probe),
440 ("diagnostics", self.diagnostics),
441 ]:
442 if cur and cur.get("enabled", False):
443 name = "internal_%s_probe_mapping" % name
444 cache_key = "InternalMapping-v2-%s-default" % name
445
446 mapping = ir.cache_fetch(cache_key)
447
448 if mapping is not None:
449 # Cache hit. We know a priori that anything in the cache under a Mapping
450 # key must be an IRBaseMapping, but let's assert that rather than casting.
451 assert isinstance(mapping, IRBaseMapping)
452 else:
453 mapping = IRHTTPMapping(
454 ir,
455 aconf,
456 kind="InternalMapping",
457 rkey=self.rkey,
458 name=name,
459 location=self.location,
460 timeout_ms=10000,
461 hostname="*",
462 **cur,
463 )
464 mapping.referenced_by(self)
465
466 ir.add_mapping(aconf, mapping)
467
468 # if ir.edge_stack_allowed:
469 # if self.diagnostics and self.diagnostics.get("enabled", False):
470 # ir.logger.debug("adding mappings for Edge Policy Console")
471 # edge_stack_response_header = {"x-content-type-options": "nosniff"}
472 # mapping = IRHTTPMapping(ir, aconf, rkey=self.rkey, location=self.location,
473 # name="edgestack-direct-mapping",
474 # metadata_labels={"ambassador_diag_class": "private"},
475 # prefix="/edge_stack/",
476 # rewrite="/edge_stack_ui/edge_stack/",
477 # service="127.0.0.1:8500",
478 # precedence=1000000,
479 # timeout_ms=60000,
480 # hostname="*",
481 # add_response_headers=edge_stack_response_header)
482 # mapping.referenced_by(self)
483 # ir.add_mapping(aconf, mapping)
484
485 # mapping = IRHTTPMapping(ir, aconf, rkey=self.rkey, location=self.location,
486 # name="edgestack-fallback-mapping",
487 # metadata_labels={"ambassador_diag_class": "private"},
488 # prefix="^/$", prefix_regex=True,
489 # rewrite="/edge_stack_ui/",
490 # service="127.0.0.1:8500",
491 # precedence=-1000000,
492 # timeout_ms=60000,
493 # hostname="*",
494 # add_response_headers=edge_stack_response_header)
495 # mapping.referenced_by(self)
496 # ir.add_mapping(aconf, mapping)
497 # else:
498 # ir.logger.debug("diagnostics disabled, skipping mapping for Edge Policy Console")
499
500 def get_default_label_domain(self) -> str:
501 return self.default_label_domain
502
503 def get_default_labels(self, domain: Optional[str] = None) -> Optional[List]:
504 if not domain:
505 domain = self.get_default_label_domain()
506
507 domain_info = self.default_labels.get(domain, {})
508
509 self.logger.debug("default_labels info for %s: %s" % (domain, domain_info))
510
511 return domain_info.get("defaults")
512
513 def handle_ip_allow_deny(self, allow: bool, principals: List[str]) -> None:
514 """
515 Handle IP Allow/Deny. "allow" here states whether this is an
516 allow rule (True) or a deny rule (False); "principals" is a list
517 of IP addresses or CIDR ranges to allow or deny.
518
519 Only one of ip_allow or ip_deny can be set, so it's an error to
520 call this twice (even if "allow" is the same for both calls).
521
522 :param allow: True for an ALLOW rule, False for a DENY rule
523 :param principals: list of IP addresses or CIDR ranges to match
524 """
525
526 if self.get("ip_allow_deny") is not None:
527 self.post_error("ip_allow and ip_deny may not both be set")
528 return
529
530 ipa = IRIPAllowDeny(
531 self.ir,
532 self.ir.aconf,
533 rkey=self.rkey,
534 parent=self,
535 action="ALLOW" if allow else "DENY",
536 principals=principals,
537 )
538
539 if ipa:
540 self["ip_allow_deny"] = ipa
View as plain text