1import hashlib
2import logging
3import os
4from typing import Optional, Tuple
5
6import pytest
7
8from ambassador import IR, Config
9from ambassador.fetch import ResourceFetcher
10from ambassador.utils import SavedSecret, SecretHandler
11
12
13# MemorySecretHandler is a degenerate SecretHandler that doesn't actually
14# cache anything to disk. It will never load a secret that isn't already
15# in the aconf.
16class MemorySecretHandler(SecretHandler):
17 def cache_internal(
18 self,
19 name: str,
20 namespace: str,
21 tls_crt: Optional[str],
22 tls_key: Optional[str],
23 user_key: Optional[str],
24 root_crt: Optional[str],
25 ) -> SavedSecret:
26 # This is mostly ripped from ambassador.utils.SecretHandler.cache_internal,
27 # just without actually saving anything.
28 tls_crt_path = None
29 tls_key_path = None
30 user_key_path = None
31 root_crt_path = None
32 cert_data = None
33
34 # Don't save if it has neither a tls_crt or a user_key or the root_crt
35 if tls_crt or user_key or root_crt:
36 h = hashlib.new("sha1")
37
38 for el in [tls_crt, tls_key, user_key]:
39 if el:
40 h.update(el.encode("utf-8"))
41
42 fp = h.hexdigest().upper()
43
44 if tls_crt:
45 tls_crt_path = f"//test-secret-{fp}.crt"
46
47 if tls_key:
48 tls_key_path = f"//test-secret-{fp}.key"
49
50 if user_key:
51 user_key_path = f"//test-secret-{fp}.user"
52
53 if root_crt:
54 root_crt_path = f"//test-secret-{fp}.root.crt"
55
56 cert_data = {
57 "tls_crt": tls_crt,
58 "tls_key": tls_key,
59 "user_key": user_key,
60 "root_crt": root_crt,
61 }
62
63 self.logger.debug(
64 f"saved secret {name}.{namespace}: {tls_crt_path}, {tls_key_path}, {root_crt_path}"
65 )
66
67 return SavedSecret(
68 name, namespace, tls_crt_path, tls_key_path, user_key_path, root_crt_path, cert_data
69 )
70
71
72def _get_config_and_ir(logger: logging.Logger, watt: str) -> Tuple[Config, IR]:
73 aconf = Config()
74 fetcher = ResourceFetcher(logger, aconf)
75 fetcher.parse_watt(watt)
76 aconf.load_all(fetcher.sorted())
77
78 secret_handler = MemorySecretHandler(
79 logger, "/tmp/unit-test-source-root", "/tmp/unit-test-cache-dir", "0"
80 )
81 ir = IR(aconf, logger=logger, file_checker=lambda path: True, secret_handler=secret_handler)
82
83 assert ir
84 return aconf, ir
85
86
87def _get_errors(caplog: pytest.LogCaptureFixture, logger_name: str, watt_data_filename: str):
88 watt_data = open(watt_data_filename).read()
89
90 aconf, ir = _get_config_and_ir(logging.getLogger(logger_name), watt_data)
91
92 log_errors = [
93 rec for rec in caplog.record_tuples if rec[0] == logger_name and rec[1] > logging.INFO
94 ]
95
96 aconf_errors = aconf.errors
97 if "-global-" in aconf_errors:
98 # We expect some global errors related to us not being a real Emissary instance, such as
99 # "Pod labels are not mounted in the container". Ignore those.
100 del aconf_errors["-global-"]
101
102 return log_errors, aconf_errors
103
104
105@pytest.mark.compilertest
106def test_acme_privatekey_secrets(caplog: pytest.LogCaptureFixture):
107 caplog.set_level(logging.DEBUG)
108
109 nl = "\n"
110 tab = "\t"
111
112 # What this test is really about is ensuring that test-acme-private-key-snapshot.json doesn't
113 # emit any errors. But, in order to validate the test itself and ensure that the test is
114 # checking for errors in the correct place, we'll also run against a bad version of that file
115 # and check that we *do* see errors.
116
117 badsnap_log_errors, badsnap_aconf_errors = _get_errors(
118 caplog,
119 "test_acme_privatekey_secrets-bad",
120 os.path.join(
121 os.path.dirname(os.path.abspath(__file__)),
122 "testdata",
123 "test-acme-private-key-snapshot-bad.json",
124 ),
125 )
126 assert badsnap_log_errors
127 assert not badsnap_aconf_errors, "Wanted no aconf errors but got:%s" % "".join(
128 [f"{nl} {err}" for err in badsnap_aconf_errors]
129 )
130
131 goodsnap_log_errors, goodsnap_aconf_errors = _get_errors(
132 caplog,
133 "test_acme_privatekey_secrets",
134 os.path.join(
135 os.path.dirname(os.path.abspath(__file__)),
136 "testdata",
137 "test-acme-private-key-snapshot.json",
138 ),
139 )
140 assert not goodsnap_log_errors, "Wanted no logged errors bug got:%s" % "".join(
141 [
142 f"{nl} {logging.getLevelName(rec[1])}{tab}{rec[0]}:{rec[2]}"
143 for rec in goodsnap_log_errors
144 ]
145 )
146 assert not goodsnap_aconf_errors, "Wanted no aconf errors but got:%s" % "".join(
147 [f"{nl} {err}" for err in goodsnap_aconf_errors]
148 )
View as plain text