...

Text file src/github.com/emissary-ingress/emissary/v3/python/tests/unit/test_acme_privatekey_secrets.py

Documentation: github.com/emissary-ingress/emissary/v3/python/tests/unit

     1import hashlib
     2import logging
     3import os
     4from typing import Optional, Tuple
     5
     6import pytest
     7
     8from ambassador import IR, Config
     9from ambassador.fetch import ResourceFetcher
    10from ambassador.utils import SavedSecret, SecretHandler
    11
    12
    13# MemorySecretHandler is a degenerate SecretHandler that doesn't actually
    14# cache anything to disk. It will never load a secret that isn't already
    15# in the aconf.
    16class MemorySecretHandler(SecretHandler):
    17    def cache_internal(
    18        self,
    19        name: str,
    20        namespace: str,
    21        tls_crt: Optional[str],
    22        tls_key: Optional[str],
    23        user_key: Optional[str],
    24        root_crt: Optional[str],
    25    ) -> SavedSecret:
    26        # This is mostly ripped from ambassador.utils.SecretHandler.cache_internal,
    27        # just without actually saving anything.
    28        tls_crt_path = None
    29        tls_key_path = None
    30        user_key_path = None
    31        root_crt_path = None
    32        cert_data = None
    33
    34        # Don't save if it has neither a tls_crt or a user_key or the root_crt
    35        if tls_crt or user_key or root_crt:
    36            h = hashlib.new("sha1")
    37
    38            for el in [tls_crt, tls_key, user_key]:
    39                if el:
    40                    h.update(el.encode("utf-8"))
    41
    42            fp = h.hexdigest().upper()
    43
    44            if tls_crt:
    45                tls_crt_path = f"//test-secret-{fp}.crt"
    46
    47            if tls_key:
    48                tls_key_path = f"//test-secret-{fp}.key"
    49
    50            if user_key:
    51                user_key_path = f"//test-secret-{fp}.user"
    52
    53            if root_crt:
    54                root_crt_path = f"//test-secret-{fp}.root.crt"
    55
    56            cert_data = {
    57                "tls_crt": tls_crt,
    58                "tls_key": tls_key,
    59                "user_key": user_key,
    60                "root_crt": root_crt,
    61            }
    62
    63            self.logger.debug(
    64                f"saved secret {name}.{namespace}: {tls_crt_path}, {tls_key_path}, {root_crt_path}"
    65            )
    66
    67        return SavedSecret(
    68            name, namespace, tls_crt_path, tls_key_path, user_key_path, root_crt_path, cert_data
    69        )
    70
    71
    72def _get_config_and_ir(logger: logging.Logger, watt: str) -> Tuple[Config, IR]:
    73    aconf = Config()
    74    fetcher = ResourceFetcher(logger, aconf)
    75    fetcher.parse_watt(watt)
    76    aconf.load_all(fetcher.sorted())
    77
    78    secret_handler = MemorySecretHandler(
    79        logger, "/tmp/unit-test-source-root", "/tmp/unit-test-cache-dir", "0"
    80    )
    81    ir = IR(aconf, logger=logger, file_checker=lambda path: True, secret_handler=secret_handler)
    82
    83    assert ir
    84    return aconf, ir
    85
    86
    87def _get_errors(caplog: pytest.LogCaptureFixture, logger_name: str, watt_data_filename: str):
    88    watt_data = open(watt_data_filename).read()
    89
    90    aconf, ir = _get_config_and_ir(logging.getLogger(logger_name), watt_data)
    91
    92    log_errors = [
    93        rec for rec in caplog.record_tuples if rec[0] == logger_name and rec[1] > logging.INFO
    94    ]
    95
    96    aconf_errors = aconf.errors
    97    if "-global-" in aconf_errors:
    98        # We expect some global errors related to us not being a real Emissary instance, such as
    99        # "Pod labels are not mounted in the container".  Ignore those.
   100        del aconf_errors["-global-"]
   101
   102    return log_errors, aconf_errors
   103
   104
   105@pytest.mark.compilertest
   106def test_acme_privatekey_secrets(caplog: pytest.LogCaptureFixture):
   107    caplog.set_level(logging.DEBUG)
   108
   109    nl = "\n"
   110    tab = "\t"
   111
   112    # What this test is really about is ensuring that test-acme-private-key-snapshot.json doesn't
   113    # emit any errors.  But, in order to validate the test itself and ensure that the test is
   114    # checking for errors in the correct place, we'll also run against a bad version of that file
   115    # and check that we *do* see errors.
   116
   117    badsnap_log_errors, badsnap_aconf_errors = _get_errors(
   118        caplog,
   119        "test_acme_privatekey_secrets-bad",
   120        os.path.join(
   121            os.path.dirname(os.path.abspath(__file__)),
   122            "testdata",
   123            "test-acme-private-key-snapshot-bad.json",
   124        ),
   125    )
   126    assert badsnap_log_errors
   127    assert not badsnap_aconf_errors, "Wanted no aconf errors but got:%s" % "".join(
   128        [f"{nl}    {err}" for err in badsnap_aconf_errors]
   129    )
   130
   131    goodsnap_log_errors, goodsnap_aconf_errors = _get_errors(
   132        caplog,
   133        "test_acme_privatekey_secrets",
   134        os.path.join(
   135            os.path.dirname(os.path.abspath(__file__)),
   136            "testdata",
   137            "test-acme-private-key-snapshot.json",
   138        ),
   139    )
   140    assert not goodsnap_log_errors, "Wanted no logged errors bug got:%s" % "".join(
   141        [
   142            f"{nl}    {logging.getLevelName(rec[1])}{tab}{rec[0]}:{rec[2]}"
   143            for rec in goodsnap_log_errors
   144        ]
   145    )
   146    assert not goodsnap_aconf_errors, "Wanted no aconf errors but got:%s" % "".join(
   147        [f"{nl}    {err}" for err in goodsnap_aconf_errors]
   148    )

View as plain text