...

Text file src/github.com/datawire/ambassador/v2/scripts/devloop-helpers/test-dump.py

Documentation: github.com/datawire/ambassador/v2/scripts/devloop-helpers

     1from typing import Dict, Optional, Tuple, TYPE_CHECKING
     2
     3import logging
     4import sys
     5
     6logging.basicConfig(
     7    level=logging.DEBUG,
     8    format="%(asctime)s test-dump %(levelname)s: %(message)s",
     9    datefmt="%Y-%m-%d %H:%M:%S"
    10)
    11
    12logger = logging.getLogger('ambassador')
    13logger.setLevel(logging.DEBUG)
    14
    15import json
    16
    17from ambassador import Config, IR
    18from ambassador.envoy import V2Config
    19
    20from ambassador.utils import SecretInfo, SavedSecret, SecretHandler
    21from ambassador.fetch import ResourceFetcher
    22
    23if TYPE_CHECKING:
    24    from ambassador.ir.irtlscontext import IRTLSContext # pragma: no cover
    25
    26
    27class SecretRecorder(SecretHandler):
    28    def __init__(self, logger: logging.Logger) -> None:
    29        super().__init__(logger, "-source_root-", "-cache_dir-")
    30        self.needed: Dict[Tuple[str, str], SecretInfo] = {}
    31
    32    # Record what was requested, and always return True.
    33    def load_secret(self, context: 'IRTLSContext',
    34                    secret_name: str, namespace: str) -> Optional[SecretInfo]:
    35        self.logger.debug(f"SecretRecorder: Trying to load secret {secret_name} in namespace {namespace} from TLSContext {context}")
    36        secret_key = ( secret_name, namespace )
    37
    38        if secret_key not in self.needed:
    39            self.needed[secret_key] = SecretInfo(secret_name, namespace, '-crt-', '-key-', decode_b64=False)
    40
    41        return self.needed[secret_key]
    42
    43    # Never cache anything.
    44    def cache_secret(self, context: 'IRTLSContext', secret_info: SecretInfo):
    45        return SavedSecret(secret_info.name, secret_info.namespace, '-crt-path-', '-key-path-',
    46                           { 'tls_crt': '-crt-', 'tls_key': '-key-' })
    47
    48
    49scc = SecretHandler(logger, "test-dump", "ss")
    50
    51yamlpath = sys.argv[1] if len(sys.argv) > 1 else "consul-3.yaml"
    52
    53aconf = Config()
    54fetcher = ResourceFetcher(logger, aconf)
    55fetcher.parse_watt(open(yamlpath, "r").read())
    56
    57aconf.load_all(fetcher.sorted())
    58
    59open("test-aconf.json", "w").write(aconf.as_json())
    60
    61# sys.exit(0)
    62
    63ir = IR(aconf, secret_handler=scc)
    64
    65open("test-ir.json", "w").write(ir.as_json())
    66
    67econf = V2Config(ir)
    68
    69open("test-v2.json", "w").write(econf.as_json())

View as plain text