1import base64
2import logging
3from typing import TYPE_CHECKING, ClassVar, Dict, List, Optional
4
5from ..config import Config
6from ..utils import SavedSecret
7from .irresource import IRResource
8
9if TYPE_CHECKING:
10 from .ir import IR # pragma: no cover
11 from .irtls import IRAmbassadorTLS # pragma: no cover
12
13
14class IRTLSContext(IRResource):
15 CertKeys: ClassVar = {
16 "secret",
17 "cert_chain_file",
18 "private_key_file",
19 "ca_secret",
20 "cacert_chain_file",
21 "crl_secret",
22 "crl_file",
23 }
24
25 AllowedKeys: ClassVar = {
26 "_ambassador_enabled",
27 "_legacy",
28 "alpn_protocols",
29 "cert_required",
30 "cipher_suites",
31 "ecdh_curves",
32 "hosts",
33 "max_tls_version",
34 "min_tls_version",
35 "redirect_cleartext_from",
36 "secret_namespacing",
37 "sni",
38 }
39
40 AllowedTLSVersions = ["v1.0", "v1.1", "v1.2", "v1.3"]
41
42 name: str
43 hosts: Optional[List[str]]
44 alpn_protocols: Optional[str]
45 cert_required: Optional[bool]
46 min_tls_version: Optional[str]
47 max_tls_version: Optional[str]
48 cipher_suites: Optional[str]
49 ecdh_curves: Optional[str]
50 redirect_cleartext_from: Optional[int]
51 secret_namespacing: Optional[bool]
52 secret_info: dict
53 sni: Optional[str]
54
55 is_fallback: bool
56
57 _ambassador_enabled: bool
58 _legacy: bool
59
60 def __init__(
61 self,
62 ir: "IR",
63 aconf: Config,
64 rkey: str, # REQUIRED
65 name: str, # REQUIRED
66 location: str, # REQUIRED
67 namespace: Optional[str] = None,
68 metadata_labels: Optional[Dict[str, str]] = None,
69 kind: str = "IRTLSContext",
70 apiVersion: str = "getambassador.io/v3alpha1",
71 is_fallback: Optional[bool] = False,
72 **kwargs,
73 ) -> None:
74
75 new_args = {
76 x: kwargs[x]
77 for x in kwargs.keys()
78 if x in IRTLSContext.AllowedKeys.union(IRTLSContext.CertKeys)
79 }
80
81 super().__init__(
82 ir=ir,
83 aconf=aconf,
84 rkey=rkey,
85 location=location,
86 kind=kind,
87 name=name,
88 namespace=namespace,
89 metadata_labels=metadata_labels,
90 is_fallback=is_fallback,
91 apiVersion=apiVersion,
92 **new_args,
93 )
94
95 def pretty(self) -> str:
96 secret_name = self.secret_info.get("secret", "-no secret-")
97 hoststr = getattr(self, "hosts", "-any-")
98 fbstr = " (fallback)" if self.is_fallback else ""
99
100 rcf = self.get("redirect_cleartext_from", None)
101 rcfstr = f" rcf {rcf}" if (rcf is not None) else ""
102
103 return f"<IRTLSContext {self.name}.{self.namespace}{rcfstr}{fbstr}: hosts {hoststr} secret {secret_name}>"
104
105 def setup(self, ir: "IR", aconf: Config) -> bool:
106 if not self.get("_ambassador_enabled", False):
107 spec_count = 0
108 errors = 0
109
110 if self.get("secret", None):
111 spec_count += 1
112
113 if self.get("cert_chain_file", None):
114 spec_count += 1
115
116 if not self.get("private_key_file", None):
117 err_msg = f"TLSContext {self.name}: 'cert_chain_file' requires 'private_key_file' as well"
118
119 self.post_error(err_msg)
120 errors += 1
121
122 if spec_count == 2:
123 err_msg = f"TLSContext {self.name}: exactly one of 'secret' and 'cert_chain_file' must be present"
124
125 self.post_error(err_msg)
126 errors += 1
127
128 if errors:
129 return False
130
131 # self.sourced_by(config)
132 # self.referenced_by(config)
133
134 # Assume that we have no redirect_cleartext_from...
135 rcf = self.get("redirect_cleartext_from", None)
136
137 if rcf is not None:
138 try:
139 self.redirect_cleartext_from = int(rcf)
140 except ValueError:
141 err_msg = f"TLSContext {self.name}: redirect_cleartext_from must be a port number rather than '{rcf}'"
142 self.post_error(err_msg)
143 self.redirect_cleartext_from = None
144
145 # Finally, move cert keys into secret_info.
146 self.secret_info = {}
147
148 for key in IRTLSContext.CertKeys:
149 if key in self:
150 self.secret_info[key] = self.pop(key)
151
152 ir.logger.debug(f"IRTLSContext setup good: {self.pretty()}")
153
154 return True
155
156 def resolve_secret(self, secret_name: str) -> SavedSecret:
157 # Assume that we need to look in whichever namespace the TLSContext itself is in...
158 namespace = self.namespace
159
160 # You can't just always allow '.' in a secret name to span namespaces, or you end up with
161 # https://github.com/datawire/ambassador/issues/1255, which is particularly problematic
162 # because (https://github.com/datawire/ambassador/issues/1475) Istio likes to use '.' in
163 # mTLS secret names. So we default to allowing the '.' as a namespace separator, but
164 # you can set secret_namespacing to False in a TLSContext or tls_secret_namespacing False
165 # in the Ambassador module's defaults to prevent that.
166
167 secret_namespacing = self.lookup(
168 "secret_namespacing", True, default_key="tls_secret_namespacing"
169 )
170
171 self.ir.logger.debug(
172 f"TLSContext.resolve_secret {secret_name}, namespace {namespace}: namespacing is {secret_namespacing}"
173 )
174
175 if "." in secret_name and secret_namespacing:
176 secret_name, namespace = secret_name.rsplit(".", 1)
177
178 return self.ir.resolve_secret(self, secret_name, namespace)
179
180 def resolve(self) -> bool:
181 if self.get("_ambassador_enabled", False):
182 self.ir.logger.debug("IRTLSContext skipping resolution of null context")
183 return True
184
185 # is_valid determines if the TLS context is valid
186 is_valid = False
187
188 # If redirect_cleartext_from or alpn_protocols is specified, the TLS Context is
189 # valid anyway, even if secret config is invalid
190 if self.get("redirect_cleartext_from", False) or self.get("alpn_protocols", False):
191 is_valid = True
192
193 # If we don't have secret info, it's worth posting an error.
194 if not self.secret_info:
195 self.post_error(
196 "TLSContext %s has no certificate information at all?" % self.name,
197 log_level=logging.DEBUG,
198 )
199
200 self.ir.logger.debug("resolve_secrets working on: %s" % self.as_json())
201
202 # OK. Do we have a secret name?
203 secret_name = self.secret_info.get("secret")
204 secret_valid = True
205
206 if secret_name:
207 # Yes. Try loading it. This always returns a SavedSecret, so that our caller
208 # has access to the name and namespace. The SavedSecret will evaluate non-True
209 # if we found no cert though.
210 ss = self.resolve_secret(secret_name)
211
212 self.ir.logger.debug("resolve_secrets: IR returned secret %s as %s" % (secret_name, ss))
213
214 if not ss:
215 # This is definitively an error: they mentioned a secret, it can't be loaded,
216 # post an error.
217 self.post_error(
218 "TLSContext %s found no certificate in %s, ignoring..." % (self.name, ss.name)
219 )
220 self.secret_info.pop("secret")
221 secret_valid = False
222 else:
223 # If they only gave a public key, that's an error
224 if not ss.key_path:
225 self.post_error(
226 "TLSContext %s found no private key in %s" % (self.name, ss.name)
227 )
228 return False
229
230 # So far, so good.
231 self.ir.logger.debug("TLSContext %s saved secret %s" % (self.name, ss.name))
232
233 # Update paths for this cert.
234 self.secret_info["cert_chain_file"] = ss.cert_path
235 self.secret_info["private_key_file"] = ss.key_path
236
237 if ss.root_cert_path:
238 self.secret_info["cacert_chain_file"] = ss.root_cert_path
239
240 self.ir.logger.debug(
241 "TLSContext - successfully processed the cert_chain_file, private_key_file, and cacert_chain_file: %s"
242 % self.secret_info
243 )
244
245 # OK. Repeat for the crl_secret.
246 crl_secret = self.secret_info.get("crl_secret")
247 if crl_secret:
248 # They gave a secret name for the certificate revocation list. Try loading it.
249 crls = self.resolve_secret(crl_secret)
250
251 self.ir.logger.debug(
252 "resolve_secrets: IR returned secret %s as %s" % (crl_secret, crls)
253 )
254
255 if not crls:
256 # This is definitively an error: they mentioned a secret, it can't be loaded,
257 # give up.
258 self.post_error(
259 "TLSContext %s found no certificate revocation list in %s"
260 % (self.name, crls.name)
261 )
262 secret_valid = False
263 else:
264 self.ir.logger.debug(
265 "TLSContext %s saved certificate revocation list secret %s"
266 % (self.name, crls.name)
267 )
268 self.secret_info["crl_file"] = crls.user_path
269
270 # OK. Repeat for the ca_secret_name.
271 ca_secret_name = self.secret_info.get("ca_secret")
272
273 if ca_secret_name:
274 if not self.secret_info.get("cert_chain_file"):
275 # DUPLICATED BELOW: This is an error: validation without termination isn't meaningful.
276 # (This is duplicated for the case where they gave a validation path.)
277 self.post_error(
278 "TLSContext %s cannot validate client certs without TLS termination" % self.name
279 )
280 return False
281
282 # They gave a secret name for the validation cert. Try loading it.
283 ss = self.resolve_secret(ca_secret_name)
284
285 self.ir.logger.debug(
286 "resolve_secrets: IR returned secret %s as %s" % (ca_secret_name, ss)
287 )
288
289 if not ss:
290 # This is definitively an error: they mentioned a secret, it can't be loaded,
291 # give up.
292 self.post_error(
293 "TLSContext %s found no validation certificate in %s" % (self.name, ss.name)
294 )
295 secret_valid = False
296 else:
297 # Validation certs don't need the private key, but it's not an error if they gave
298 # one. We're good to go here.
299 self.ir.logger.debug("TLSContext %s saved CA secret %s" % (self.name, ss.name))
300 self.secret_info["cacert_chain_file"] = ss.cert_path
301
302 # While we're here, did they set cert_required _in the secret_?
303 if ss.cert_data:
304 cert_required = ss.cert_data.get("cert_required")
305
306 if cert_required is not None:
307 decoded = base64.b64decode(cert_required).decode("utf-8").lower() == "true"
308
309 # cert_required is at toplevel, _not_ in secret_info!
310 self["cert_required"] = decoded
311 else:
312 # No secret is named; did they provide a file location instead?
313 if self.secret_info.get("cacert_chain_file") and not self.secret_info.get(
314 "cert_chain_file"
315 ):
316 # DUPLICATED ABOVE: This is an error: validation without termination isn't meaningful.
317 # (This is duplicated for the case where they gave a validation secret.)
318 self.post_error(
319 "TLSContext %s cannot validate client certs without TLS termination" % self.name
320 )
321 return False
322
323 # If the secret has been invalidated above, then we do not need to check for paths down under.
324 # We can return whether the TLS Context is valid or not.
325 if not secret_valid:
326 return is_valid
327
328 # OK. Check paths.
329 errors = 0
330
331 # self.ir.logger.debug("resolve_secrets before path checks: %s" % self.as_json())
332 for key in ["cert_chain_file", "private_key_file", "cacert_chain_file", "crl_file"]:
333 path = self.secret_info.get(key, None)
334
335 if path:
336 fc = getattr(self.ir, "file_checker")
337 if not fc(path):
338 self.post_error("TLSContext %s found no %s '%s'" % (self.name, key, path))
339 errors += 1
340 elif (not (key == "cacert_chain_file" or key == "crl_file")) and self.get(
341 "hosts", None
342 ):
343 self.post_error("TLSContext %s is missing %s" % (self.name, key))
344 errors += 1
345
346 if errors > 0:
347 return False
348
349 return True
350
351 def has_secret(self) -> bool:
352 # Safely verify that self.secret_info['secret'] exists -- in other words, verify
353 # that this IRTLSContext is based on a Secret we load from elsewhere, rather than
354 # on files in the filesystem.
355 si = self.get("secret_info", {})
356
357 return "secret" in si
358
359 def secret_name(self) -> Optional[str]:
360 # Return the name of the Secret we're based on, or None if we're based on files
361 # in the filesystem.
362 #
363 # XXX Currently this implies a _Kubernetes_ Secret, and we might have to change
364 # this later.
365
366 if self.has_secret():
367 return self.secret_info["secret"]
368 else:
369 return None
370
371 def set_secret_name(self, secret_name: str) -> None:
372 # Set the name of the Secret we're based on.
373 self.secret_info["secret"] = secret_name
374
375 @classmethod
376 def null_context(cls, ir: "IR") -> "IRTLSContext":
377 ctx = ir.get_tls_context("no-cert-upstream")
378
379 if not ctx:
380 ctx = IRTLSContext(
381 ir,
382 ir.aconf,
383 rkey="ir.no-cert-upstream",
384 name="no-cert-upstream",
385 location="ir.no-cert-upstream",
386 kind="null-TLS-context",
387 _ambassador_enabled=True,
388 )
389
390 ir.save_tls_context(ctx)
391
392 return ctx
393
394 @classmethod
395 def from_legacy(
396 cls,
397 ir: "IR",
398 name: str,
399 rkey: str,
400 location: str,
401 cert: "IRAmbassadorTLS",
402 termination: bool,
403 validation_ca: Optional["IRAmbassadorTLS"],
404 ) -> "IRTLSContext":
405 """
406 Create an IRTLSContext from a legacy TLS-module style definition.
407
408 'cert' is the TLS certificate that we'll offer to our peer -- for a termination
409 context, this is our server cert, and for an origination context, it's our client
410 cert.
411
412 For termination contexts, 'validation_ca' may also be provided. It's the TLS
413 certificate that we'll use to validate the certificates our clients offer. Note
414 that no private key is needed or supported.
415
416 :param ir: IR in play
417 :param name: name for the newly-created context
418 :param rkey: rkey for the newly-created context
419 :param location: location for the newly-created context
420 :param cert: information about the cert to present to the peer
421 :param termination: is this a termination context?
422 :param validation_ca: information about how we'll validate the peer's cert
423 :return: newly-created IRTLSContext
424 """
425 new_args = {}
426
427 for key in [
428 "secret",
429 "cert_chain_file",
430 "private_key_file",
431 "alpn_protocols",
432 "redirect_cleartext_from",
433 ]:
434 value = cert.get(key, None)
435
436 if value:
437 new_args[key] = value
438
439 if (
440 ("secret" not in new_args)
441 and ("cert_chain_file" not in new_args)
442 and ("private_key_file" not in new_args)
443 ):
444 # Assume they want the 'ambassador-certs' secret.
445 new_args["secret"] = "ambassador-certs"
446
447 if termination:
448 new_args["hosts"] = ["*"]
449
450 if validation_ca and validation_ca.get("enabled", True):
451 for key in ["secret", "cacert_chain_file", "cert_required"]:
452 value = validation_ca.get(key, None)
453
454 if value:
455 if key == "secret":
456 new_args["ca_secret"] = value
457 else:
458 new_args[key] = value
459
460 if ("ca_secret" not in new_args) and ("cacert_chain_file" not in new_args):
461 # Assume they want the 'ambassador-cacert' secret.
462 new_args["secret"] = "ambassador-cacert"
463
464 ctx = IRTLSContext(
465 ir,
466 ir.aconf,
467 rkey=rkey,
468 name=name,
469 location=location,
470 kind="synthesized-TLS-context",
471 _legacy=True,
472 **new_args,
473 )
474
475 return ctx
476
477
478class TLSContextFactory:
479 @classmethod
480 def load_all(cls, ir: "IR", aconf: Config) -> None:
481 assert ir
482
483 # Save TLS contexts from the aconf into the IR. Note that the contexts in the aconf
484 # are just ACResources; they need to be turned into IRTLSContexts.
485 tls_contexts = aconf.get_config("tls_contexts")
486
487 if tls_contexts is not None:
488 for config in tls_contexts.values():
489 ctx = IRTLSContext(ir, aconf, **config)
490
491 if ctx.is_active():
492 ctx.referenced_by(config)
493 ctx.sourced_by(config)
494
495 ir.save_tls_context(ctx)
View as plain text