...

Text file src/github.com/datawire/ambassador/v2/python/tests/utils.py

Documentation: github.com/datawire/ambassador/v2/python/tests

     1import json
     2import logging
     3import os
     4import subprocess
     5import tempfile
     6from base64 import b64encode
     7from collections import namedtuple
     8
     9from OpenSSL import crypto
    10
    11from ambassador import IR, Cache
    12from ambassador.compile import Compile
    13from ambassador.utils import NullSecretHandler
    14
    15logger = logging.getLogger("ambassador")
    16
    17ENVOY_PATH = os.environ.get("ENVOY_PATH", "/usr/local/bin/envoy")
    18
    19SUPPORTED_ENVOY_VERSIONS = ["V2", "V3"]
    20
    21
    22def zipkin_tracing_service_manifest():
    23    return """
    24---
    25apiVersion: getambassador.io/v3alpha1
    26kind: TracingService
    27metadata:
    28  name: tracing
    29  namespace: ambassador
    30spec:
    31  service: zipkin:9411
    32  driver: zipkin
    33  config: {}
    34"""
    35
    36
    37def default_listener_manifests():
    38    return """
    39---
    40apiVersion: getambassador.io/v3alpha1
    41kind: Listener
    42metadata:
    43  name: listener-8080
    44  namespace: default
    45spec:
    46  port: 8080
    47  protocol: HTTPS
    48  securityModel: XFP
    49  hostBinding:
    50    namespace:
    51      from: ALL
    52---
    53apiVersion: getambassador.io/v3alpha1
    54kind: Listener
    55metadata:
    56  name: listener-8443
    57  namespace: default
    58spec:
    59  port: 8443
    60  protocol: HTTPS
    61  securityModel: XFP
    62  hostBinding:
    63    namespace:
    64      from: ALL
    65"""
    66
    67
    68def module_and_mapping_manifests(module_confs, mapping_confs):
    69    yaml = (
    70        default_listener_manifests()
    71        + """
    72---
    73apiVersion: getambassador.io/v3alpha1
    74kind: Module
    75metadata:
    76  name: ambassador
    77  namespace: default
    78spec:
    79  config:"""
    80    )
    81    if module_confs:
    82        for module_conf in module_confs:
    83            yaml = (
    84                yaml
    85                + """
    86    {}
    87""".format(
    88                    module_conf
    89                )
    90            )
    91    else:
    92        yaml = yaml + " {}\n"
    93
    94    yaml = (
    95        yaml
    96        + """
    97---
    98apiVersion: getambassador.io/v3alpha1
    99kind: Mapping
   100metadata:
   101  name: ambassador
   102  namespace: default
   103spec:
   104  hostname: "*"
   105  prefix: /httpbin/
   106  service: httpbin"""
   107    )
   108    if mapping_confs:
   109        for mapping_conf in mapping_confs:
   110            yaml = (
   111                yaml
   112                + """
   113  {}""".format(
   114                    mapping_conf
   115                )
   116            )
   117    return yaml
   118
   119
   120def _require_no_errors(ir: IR):
   121    assert ir.aconf.errors == {}
   122
   123
   124def _secret_handler():
   125    source_root = tempfile.TemporaryDirectory(prefix="null-secret-", suffix="-source")
   126    cache_dir = tempfile.TemporaryDirectory(prefix="null-secret-", suffix="-cache")
   127    return NullSecretHandler(logger, source_root.name, cache_dir.name, "fake")
   128
   129
   130def compile_with_cachecheck(yaml, envoy_version="V3", errors_ok=False):
   131    # Compile with and without a cache. Neither should produce errors.
   132    cache = Cache(logger)
   133    secret_handler = _secret_handler()
   134    r1 = Compile(logger, yaml, k8s=True, secret_handler=secret_handler, envoy_version=envoy_version)
   135    r2 = Compile(
   136        logger,
   137        yaml,
   138        k8s=True,
   139        secret_handler=secret_handler,
   140        cache=cache,
   141        envoy_version=envoy_version,
   142    )
   143
   144    if not errors_ok:
   145        _require_no_errors(r1["ir"])
   146        _require_no_errors(r2["ir"])
   147
   148    # Both should produce equal Envoy config as sorted json.
   149    r1j = json.dumps(r1[envoy_version.lower()].as_dict(), sort_keys=True, indent=2)
   150    r2j = json.dumps(r2[envoy_version.lower()].as_dict(), sort_keys=True, indent=2)
   151    assert r1j == r2j
   152
   153    # All good.
   154    return r1
   155
   156
   157EnvoyFilterInfo = namedtuple("EnvoyFilterInfo", ["name", "type"])
   158
   159EnvoyHCMInfo = {
   160    "V2": EnvoyFilterInfo(
   161        name="envoy.filters.network.http_connection_manager",
   162        type="type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager",
   163    ),
   164    "V3": EnvoyFilterInfo(
   165        name="envoy.filters.network.http_connection_manager",
   166        type="type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager",
   167    ),
   168}
   169
   170EnvoyTCPInfo = {
   171    "V2": EnvoyFilterInfo(
   172        name="envoy.filters.network.tcp_proxy",
   173        type="type.googleapis.com/envoy.config.filter.network.tcp_proxy.v2.TcpProxy",
   174    ),
   175    "V3": EnvoyFilterInfo(
   176        name="envoy.filters.network.tcp_proxy",
   177        type="type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
   178    ),
   179}
   180
   181
   182def econf_compile(yaml, envoy_version="V2"):
   183    compiled = compile_with_cachecheck(yaml, envoy_version=envoy_version)
   184    return compiled[envoy_version.lower()].as_dict()
   185
   186
   187def econf_foreach_listener(econf, fn, envoy_version="V3", listener_count=1):
   188    listeners = econf["static_resources"]["listeners"]
   189
   190    wanted_plural = "" if (listener_count == 1) else "s"
   191    assert (
   192        len(listeners) == listener_count
   193    ), f"Expected {listener_count} listener{wanted_plural}, got {len(listeners)}"
   194
   195    for listener in listeners:
   196        fn(listener, envoy_version)
   197
   198
   199def econf_foreach_listener_chain(
   200    listener, fn, chain_count=2, need_name=None, need_type=None, dump_info=None
   201):
   202    # We need a specific number of filter chains. Normally it's 2,
   203    # since the compiler tests don't generally supply Listeners or Hosts,
   204    # so we get secure and insecure chains.
   205    filter_chains = listener["filter_chains"]
   206
   207    if dump_info:
   208        dump_info(filter_chains)
   209
   210    wanted_plural = "" if (chain_count == 1) else "s"
   211    assert (
   212        len(filter_chains) == chain_count
   213    ), f"Expected {chain_count} filter chain{wanted_plural}, got {len(filter_chains)}"
   214
   215    for chain in filter_chains:
   216        # We expect one filter on this chain.
   217        filters = chain["filters"]
   218        got_count = len(filters)
   219        got_plural = "" if (got_count == 1) else "s"
   220        assert got_count == 1, f"Expected just one filter, got {got_count} filter{got_plural}"
   221
   222        # The http connection manager is the only filter on the chain from the one and only vhost.
   223        filter = filters[0]
   224
   225        if need_name:
   226            assert filter["name"] == need_name
   227
   228        typed_config = filter["typed_config"]
   229
   230        if need_type:
   231            assert typed_config["@type"] == need_type, f"bad type: {typed_config['@type']}"
   232
   233        fn(typed_config)
   234
   235
   236def econf_foreach_hcm(econf, fn, envoy_version="V3", chain_count=2):
   237    for listener in econf["static_resources"]["listeners"]:
   238        hcm_info = EnvoyHCMInfo[envoy_version]
   239
   240        econf_foreach_listener_chain(
   241            listener, fn, chain_count=chain_count, need_name=hcm_info.name, need_type=hcm_info.type
   242        )
   243
   244
   245def econf_foreach_cluster(econf, fn, name="cluster_httpbin_default"):
   246    for cluster in econf["static_resources"]["clusters"]:
   247        if cluster["name"] != name:
   248            continue
   249
   250        found_cluster = True
   251        r = fn(cluster)
   252        if not r:
   253            break
   254    assert found_cluster
   255
   256
   257def assert_valid_envoy_config(config_dict, v2=False):
   258    with tempfile.NamedTemporaryFile() as temp:
   259        temp.write(bytes(json.dumps(config_dict), encoding="utf-8"))
   260        temp.flush()
   261        f_name = temp.name
   262        cmd = [ENVOY_PATH, "--config-path", f_name, "--mode", "validate"]
   263        if v2:
   264            cmd.append("--bootstrap-version 2")
   265        p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
   266        if p.returncode != 0:
   267            print(p.stdout)
   268        p.check_returncode()
   269
   270
   271def create_crl_pem_b64(issuerCert, issuerKey, revokedCerts):
   272    when = b"20220516010101Z"
   273    crl = crypto.CRL()
   274    crl.set_lastUpdate(when)
   275
   276    for revokedCert in revokedCerts:
   277        clientCert = crypto.load_certificate(crypto.FILETYPE_PEM, bytes(revokedCert, "utf-8"))
   278        r = crypto.Revoked()
   279        r.set_serial(bytes("{:x}".format(clientCert.get_serial_number()), "ascii"))
   280        r.set_rev_date(when)
   281        r.set_reason(None)
   282        crl.add_revoked(r)
   283
   284    cert = crypto.load_certificate(crypto.FILETYPE_PEM, bytes(issuerCert, "utf-8"))
   285    key = crypto.load_privatekey(crypto.FILETYPE_PEM, bytes(issuerKey, "utf-8"))
   286    crl.sign(cert, key, b"sha256")
   287    return b64encode(
   288        (crypto.dump_crl(crypto.FILETYPE_PEM, crl).decode("utf-8") + "\n").encode("utf-8")
   289    ).decode("utf-8")

View as plain text