...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador/ir/irtlscontext.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador/ir

     1import base64
     2import logging
     3from typing import TYPE_CHECKING, ClassVar, Dict, List, Optional
     4
     5from ..config import Config
     6from ..utils import SavedSecret
     7from .irresource import IRResource
     8
     9if TYPE_CHECKING:
    10    from .ir import IR  # pragma: no cover
    11    from .irtls import IRAmbassadorTLS  # pragma: no cover
    12
    13
    14class IRTLSContext(IRResource):
    15    CertKeys: ClassVar = {
    16        "secret",
    17        "cert_chain_file",
    18        "private_key_file",
    19        "ca_secret",
    20        "cacert_chain_file",
    21        "crl_secret",
    22        "crl_file",
    23    }
    24
    25    AllowedKeys: ClassVar = {
    26        "_ambassador_enabled",
    27        "_legacy",
    28        "alpn_protocols",
    29        "cert_required",
    30        "cipher_suites",
    31        "ecdh_curves",
    32        "hosts",
    33        "max_tls_version",
    34        "min_tls_version",
    35        "redirect_cleartext_from",
    36        "secret_namespacing",
    37        "sni",
    38    }
    39
    40    AllowedTLSVersions = ["v1.0", "v1.1", "v1.2", "v1.3"]
    41
    42    name: str
    43    hosts: Optional[List[str]]
    44    alpn_protocols: Optional[str]
    45    cert_required: Optional[bool]
    46    min_tls_version: Optional[str]
    47    max_tls_version: Optional[str]
    48    cipher_suites: Optional[str]
    49    ecdh_curves: Optional[str]
    50    redirect_cleartext_from: Optional[int]
    51    secret_namespacing: Optional[bool]
    52    secret_info: dict
    53    sni: Optional[str]
    54
    55    is_fallback: bool
    56
    57    _ambassador_enabled: bool
    58    _legacy: bool
    59
    60    def __init__(
    61        self,
    62        ir: "IR",
    63        aconf: Config,
    64        rkey: str,  # REQUIRED
    65        name: str,  # REQUIRED
    66        location: str,  # REQUIRED
    67        namespace: Optional[str] = None,
    68        metadata_labels: Optional[Dict[str, str]] = None,
    69        kind: str = "IRTLSContext",
    70        apiVersion: str = "getambassador.io/v3alpha1",
    71        is_fallback: Optional[bool] = False,
    72        **kwargs,
    73    ) -> None:
    74
    75        new_args = {
    76            x: kwargs[x]
    77            for x in kwargs.keys()
    78            if x in IRTLSContext.AllowedKeys.union(IRTLSContext.CertKeys)
    79        }
    80
    81        super().__init__(
    82            ir=ir,
    83            aconf=aconf,
    84            rkey=rkey,
    85            location=location,
    86            kind=kind,
    87            name=name,
    88            namespace=namespace,
    89            metadata_labels=metadata_labels,
    90            is_fallback=is_fallback,
    91            apiVersion=apiVersion,
    92            **new_args,
    93        )
    94
    95    def pretty(self) -> str:
    96        secret_name = self.secret_info.get("secret", "-no secret-")
    97        hoststr = getattr(self, "hosts", "-any-")
    98        fbstr = " (fallback)" if self.is_fallback else ""
    99
   100        rcf = self.get("redirect_cleartext_from", None)
   101        rcfstr = f" rcf {rcf}" if (rcf is not None) else ""
   102
   103        return f"<IRTLSContext {self.name}.{self.namespace}{rcfstr}{fbstr}: hosts {hoststr} secret {secret_name}>"
   104
   105    def setup(self, ir: "IR", aconf: Config) -> bool:
   106        if not self.get("_ambassador_enabled", False):
   107            spec_count = 0
   108            errors = 0
   109
   110            if self.get("secret", None):
   111                spec_count += 1
   112
   113            if self.get("cert_chain_file", None):
   114                spec_count += 1
   115
   116                if not self.get("private_key_file", None):
   117                    err_msg = f"TLSContext {self.name}: 'cert_chain_file' requires 'private_key_file' as well"
   118
   119                    self.post_error(err_msg)
   120                    errors += 1
   121
   122            if spec_count == 2:
   123                err_msg = f"TLSContext {self.name}: exactly one of 'secret' and 'cert_chain_file' must be present"
   124
   125                self.post_error(err_msg)
   126                errors += 1
   127
   128            if errors:
   129                return False
   130
   131        # self.sourced_by(config)
   132        # self.referenced_by(config)
   133
   134        # Assume that we have no redirect_cleartext_from...
   135        rcf = self.get("redirect_cleartext_from", None)
   136
   137        if rcf is not None:
   138            try:
   139                self.redirect_cleartext_from = int(rcf)
   140            except ValueError:
   141                err_msg = f"TLSContext {self.name}: redirect_cleartext_from must be a port number rather than '{rcf}'"
   142                self.post_error(err_msg)
   143                self.redirect_cleartext_from = None
   144
   145        # Finally, move cert keys into secret_info.
   146        self.secret_info = {}
   147
   148        for key in IRTLSContext.CertKeys:
   149            if key in self:
   150                self.secret_info[key] = self.pop(key)
   151
   152        ir.logger.debug(f"IRTLSContext setup good: {self.pretty()}")
   153
   154        return True
   155
   156    def resolve_secret(self, secret_name: str) -> SavedSecret:
   157        # Assume that we need to look in whichever namespace the TLSContext itself is in...
   158        namespace = self.namespace
   159
   160        # You can't just always allow '.' in a secret name to span namespaces, or you end up with
   161        # https://github.com/datawire/ambassador/issues/1255, which is particularly problematic
   162        # because (https://github.com/datawire/ambassador/issues/1475) Istio likes to use '.' in
   163        # mTLS secret names. So we default to allowing the '.' as a namespace separator, but
   164        # you can set secret_namespacing to False in a TLSContext or tls_secret_namespacing False
   165        # in the Ambassador module's defaults to prevent that.
   166
   167        secret_namespacing = self.lookup(
   168            "secret_namespacing", True, default_key="tls_secret_namespacing"
   169        )
   170
   171        self.ir.logger.debug(
   172            f"TLSContext.resolve_secret {secret_name}, namespace {namespace}: namespacing is {secret_namespacing}"
   173        )
   174
   175        if "." in secret_name and secret_namespacing:
   176            secret_name, namespace = secret_name.rsplit(".", 1)
   177
   178        return self.ir.resolve_secret(self, secret_name, namespace)
   179
   180    def resolve(self) -> bool:
   181        if self.get("_ambassador_enabled", False):
   182            self.ir.logger.debug("IRTLSContext skipping resolution of null context")
   183            return True
   184
   185        # is_valid determines if the TLS context is valid
   186        is_valid = False
   187
   188        # If redirect_cleartext_from or alpn_protocols is specified, the TLS Context is
   189        # valid anyway, even if secret config is invalid
   190        if self.get("redirect_cleartext_from", False) or self.get("alpn_protocols", False):
   191            is_valid = True
   192
   193        # If we don't have secret info, it's worth posting an error.
   194        if not self.secret_info:
   195            self.post_error(
   196                "TLSContext %s has no certificate information at all?" % self.name,
   197                log_level=logging.DEBUG,
   198            )
   199
   200        self.ir.logger.debug("resolve_secrets working on: %s" % self.as_json())
   201
   202        # OK. Do we have a secret name?
   203        secret_name = self.secret_info.get("secret")
   204        secret_valid = True
   205
   206        if secret_name:
   207            # Yes. Try loading it. This always returns a SavedSecret, so that our caller
   208            # has access to the name and namespace. The SavedSecret will evaluate non-True
   209            # if we found no cert though.
   210            ss = self.resolve_secret(secret_name)
   211
   212            self.ir.logger.debug("resolve_secrets: IR returned secret %s as %s" % (secret_name, ss))
   213
   214            if not ss:
   215                # This is definitively an error: they mentioned a secret, it can't be loaded,
   216                # post an error.
   217                self.post_error(
   218                    "TLSContext %s found no certificate in %s, ignoring..." % (self.name, ss.name)
   219                )
   220                self.secret_info.pop("secret")
   221                secret_valid = False
   222            else:
   223                # If they only gave a public key, that's an error
   224                if not ss.key_path:
   225                    self.post_error(
   226                        "TLSContext %s found no private key in %s" % (self.name, ss.name)
   227                    )
   228                    return False
   229
   230                # So far, so good.
   231                self.ir.logger.debug("TLSContext %s saved secret %s" % (self.name, ss.name))
   232
   233                # Update paths for this cert.
   234                self.secret_info["cert_chain_file"] = ss.cert_path
   235                self.secret_info["private_key_file"] = ss.key_path
   236
   237                if ss.root_cert_path:
   238                    self.secret_info["cacert_chain_file"] = ss.root_cert_path
   239
   240        self.ir.logger.debug(
   241            "TLSContext - successfully processed the cert_chain_file, private_key_file, and cacert_chain_file: %s"
   242            % self.secret_info
   243        )
   244
   245        # OK. Repeat for the crl_secret.
   246        crl_secret = self.secret_info.get("crl_secret")
   247        if crl_secret:
   248            # They gave a secret name for the certificate revocation list. Try loading it.
   249            crls = self.resolve_secret(crl_secret)
   250
   251            self.ir.logger.debug(
   252                "resolve_secrets: IR returned secret %s as %s" % (crl_secret, crls)
   253            )
   254
   255            if not crls:
   256                # This is definitively an error: they mentioned a secret, it can't be loaded,
   257                # give up.
   258                self.post_error(
   259                    "TLSContext %s found no certificate revocation list in %s"
   260                    % (self.name, crls.name)
   261                )
   262                secret_valid = False
   263            else:
   264                self.ir.logger.debug(
   265                    "TLSContext %s saved certificate revocation list secret %s"
   266                    % (self.name, crls.name)
   267                )
   268                self.secret_info["crl_file"] = crls.user_path
   269
   270        # OK. Repeat for the ca_secret_name.
   271        ca_secret_name = self.secret_info.get("ca_secret")
   272
   273        if ca_secret_name:
   274            if not self.secret_info.get("cert_chain_file"):
   275                # DUPLICATED BELOW: This is an error: validation without termination isn't meaningful.
   276                # (This is duplicated for the case where they gave a validation path.)
   277                self.post_error(
   278                    "TLSContext %s cannot validate client certs without TLS termination" % self.name
   279                )
   280                return False
   281
   282            # They gave a secret name for the validation cert. Try loading it.
   283            ss = self.resolve_secret(ca_secret_name)
   284
   285            self.ir.logger.debug(
   286                "resolve_secrets: IR returned secret %s as %s" % (ca_secret_name, ss)
   287            )
   288
   289            if not ss:
   290                # This is definitively an error: they mentioned a secret, it can't be loaded,
   291                # give up.
   292                self.post_error(
   293                    "TLSContext %s found no validation certificate in %s" % (self.name, ss.name)
   294                )
   295                secret_valid = False
   296            else:
   297                # Validation certs don't need the private key, but it's not an error if they gave
   298                # one. We're good to go here.
   299                self.ir.logger.debug("TLSContext %s saved CA secret %s" % (self.name, ss.name))
   300                self.secret_info["cacert_chain_file"] = ss.cert_path
   301
   302                # While we're here, did they set cert_required _in the secret_?
   303                if ss.cert_data:
   304                    cert_required = ss.cert_data.get("cert_required")
   305
   306                    if cert_required is not None:
   307                        decoded = base64.b64decode(cert_required).decode("utf-8").lower() == "true"
   308
   309                        # cert_required is at toplevel, _not_ in secret_info!
   310                        self["cert_required"] = decoded
   311        else:
   312            # No secret is named; did they provide a file location instead?
   313            if self.secret_info.get("cacert_chain_file") and not self.secret_info.get(
   314                "cert_chain_file"
   315            ):
   316                # DUPLICATED ABOVE: This is an error: validation without termination isn't meaningful.
   317                # (This is duplicated for the case where they gave a validation secret.)
   318                self.post_error(
   319                    "TLSContext %s cannot validate client certs without TLS termination" % self.name
   320                )
   321                return False
   322
   323        # If the secret has been invalidated above, then we do not need to check for paths down under.
   324        # We can return whether the TLS Context is valid or not.
   325        if not secret_valid:
   326            return is_valid
   327
   328        # OK. Check paths.
   329        errors = 0
   330
   331        # self.ir.logger.debug("resolve_secrets before path checks: %s" % self.as_json())
   332        for key in ["cert_chain_file", "private_key_file", "cacert_chain_file", "crl_file"]:
   333            path = self.secret_info.get(key, None)
   334
   335            if path:
   336                fc = getattr(self.ir, "file_checker")
   337                if not fc(path):
   338                    self.post_error("TLSContext %s found no %s '%s'" % (self.name, key, path))
   339                    errors += 1
   340            elif (not (key == "cacert_chain_file" or key == "crl_file")) and self.get(
   341                "hosts", None
   342            ):
   343                self.post_error("TLSContext %s is missing %s" % (self.name, key))
   344                errors += 1
   345
   346        if errors > 0:
   347            return False
   348
   349        return True
   350
   351    def has_secret(self) -> bool:
   352        # Safely verify that self.secret_info['secret'] exists -- in other words, verify
   353        # that this IRTLSContext is based on a Secret we load from elsewhere, rather than
   354        # on files in the filesystem.
   355        si = self.get("secret_info", {})
   356
   357        return "secret" in si
   358
   359    def secret_name(self) -> Optional[str]:
   360        # Return the name of the Secret we're based on, or None if we're based on files
   361        # in the filesystem.
   362        #
   363        # XXX Currently this implies a _Kubernetes_ Secret, and we might have to change
   364        # this later.
   365
   366        if self.has_secret():
   367            return self.secret_info["secret"]
   368        else:
   369            return None
   370
   371    def set_secret_name(self, secret_name: str) -> None:
   372        # Set the name of the Secret we're based on.
   373        self.secret_info["secret"] = secret_name
   374
   375    @classmethod
   376    def null_context(cls, ir: "IR") -> "IRTLSContext":
   377        ctx = ir.get_tls_context("no-cert-upstream")
   378
   379        if not ctx:
   380            ctx = IRTLSContext(
   381                ir,
   382                ir.aconf,
   383                rkey="ir.no-cert-upstream",
   384                name="no-cert-upstream",
   385                location="ir.no-cert-upstream",
   386                kind="null-TLS-context",
   387                _ambassador_enabled=True,
   388            )
   389
   390            ir.save_tls_context(ctx)
   391
   392        return ctx
   393
   394    @classmethod
   395    def from_legacy(
   396        cls,
   397        ir: "IR",
   398        name: str,
   399        rkey: str,
   400        location: str,
   401        cert: "IRAmbassadorTLS",
   402        termination: bool,
   403        validation_ca: Optional["IRAmbassadorTLS"],
   404    ) -> "IRTLSContext":
   405        """
   406        Create an IRTLSContext from a legacy TLS-module style definition.
   407
   408        'cert' is the TLS certificate that we'll offer to our peer -- for a termination
   409        context, this is our server cert, and for an origination context, it's our client
   410        cert.
   411
   412        For termination contexts, 'validation_ca' may also be provided. It's the TLS
   413        certificate that we'll use to validate the certificates our clients offer. Note
   414        that no private key is needed or supported.
   415
   416        :param ir: IR in play
   417        :param name: name for the newly-created context
   418        :param rkey: rkey for the newly-created context
   419        :param location: location for the newly-created context
   420        :param cert: information about the cert to present to the peer
   421        :param termination: is this a termination context?
   422        :param validation_ca: information about how we'll validate the peer's cert
   423        :return: newly-created IRTLSContext
   424        """
   425        new_args = {}
   426
   427        for key in [
   428            "secret",
   429            "cert_chain_file",
   430            "private_key_file",
   431            "alpn_protocols",
   432            "redirect_cleartext_from",
   433        ]:
   434            value = cert.get(key, None)
   435
   436            if value:
   437                new_args[key] = value
   438
   439        if (
   440            ("secret" not in new_args)
   441            and ("cert_chain_file" not in new_args)
   442            and ("private_key_file" not in new_args)
   443        ):
   444            # Assume they want the 'ambassador-certs' secret.
   445            new_args["secret"] = "ambassador-certs"
   446
   447        if termination:
   448            new_args["hosts"] = ["*"]
   449
   450            if validation_ca and validation_ca.get("enabled", True):
   451                for key in ["secret", "cacert_chain_file", "cert_required"]:
   452                    value = validation_ca.get(key, None)
   453
   454                    if value:
   455                        if key == "secret":
   456                            new_args["ca_secret"] = value
   457                        else:
   458                            new_args[key] = value
   459
   460                if ("ca_secret" not in new_args) and ("cacert_chain_file" not in new_args):
   461                    # Assume they want the 'ambassador-cacert' secret.
   462                    new_args["secret"] = "ambassador-cacert"
   463
   464        ctx = IRTLSContext(
   465            ir,
   466            ir.aconf,
   467            rkey=rkey,
   468            name=name,
   469            location=location,
   470            kind="synthesized-TLS-context",
   471            _legacy=True,
   472            **new_args,
   473        )
   474
   475        return ctx
   476
   477
   478class TLSContextFactory:
   479    @classmethod
   480    def load_all(cls, ir: "IR", aconf: Config) -> None:
   481        assert ir
   482
   483        # Save TLS contexts from the aconf into the IR. Note that the contexts in the aconf
   484        # are just ACResources; they need to be turned into IRTLSContexts.
   485        tls_contexts = aconf.get_config("tls_contexts")
   486
   487        if tls_contexts is not None:
   488            for config in tls_contexts.values():
   489                ctx = IRTLSContext(ir, aconf, **config)
   490
   491                if ctx.is_active():
   492                    ctx.referenced_by(config)
   493                    ctx.sourced_by(config)
   494
   495                    ir.save_tls_context(ctx)

View as plain text