...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador_cli/ert.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador_cli

     1# Copyright 2020 Datawire. All rights reserved.
     2#
     3# Licensed under the Apache License, Version 2.0 (the "License");
     4# you may not use this file except in compliance with the License.
     5# You may obtain a copy of the License at
     6#
     7#     http://www.apache.org/licenses/LICENSE-2.0
     8#
     9# Unless required by applicable law or agreed to in writing, software
    10# distributed under the License is distributed on an "AS IS" BASIS,
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12# See the License for the specific language governing permissions and
    13# limitations under the License
    14
    15########
    16# This is a debugging tool that, given an envoy.json _generated by Ambassador_, will
    17# dump a simplified view of the routing table implemented by that envoy.json.
    18#
    19# **NOTE WELL** that this is not a general-purpose Envoy visualization tool. It is
    20# very specific to Ambassador.
    21########
    22
    23import functools
    24import json
    25import re
    26from collections import OrderedDict
    27from typing import Any, Optional
    28
    29import click
    30import dpath.util
    31
    32
    33def lookup(x: Any, path: str) -> Optional[Any]:
    34    try:
    35        return dpath.util.get(x, path)
    36    except KeyError:
    37        return None
    38
    39
    40reSecret = re.compile(r"^.*/snapshots/([^/]+)/secrets-decoded/([^/]+)/[0-9A-F]+\.(.+)$")
    41
    42# Use this instead of click.option
    43click_option = functools.partial(click.option, show_default=True)
    44click_option_no_default = functools.partial(click.option, show_default=False)
    45
    46
    47@click.command(help="Show a simplified Envoy config breakdown")
    48@click.argument("envoy-config-path", type=click.Path(exists=True, readable=True))
    49def main(envoy_config_path: str) -> None:
    50    econf = json.load(open(envoy_config_path, "r"))
    51
    52    listeners = lookup(econf, "/static_resources/listeners") or []
    53
    54    for listener in listeners:
    55        bind_addr = lookup(listener, "/address/socket_address/address")
    56        port = lookup(listener, "/address/socket_address/port_value")
    57        proto = lookup(listener, "/address/socket_address/protocol")
    58
    59        lfilters = lookup(listener, "/listener_filters")
    60        lfiltstr = ""
    61
    62        if lfilters:
    63            lfilter_names = [x["name"].replace("envoy.listener.", "") for x in lfilters]
    64            lfiltstr = f" [using {', '.join(lfilter_names)}]"
    65
    66        print(f"LISTENER on {proto} {bind_addr}:{port}{lfiltstr}")
    67
    68        for chain in listener["filter_chains"]:
    69            upp = chain.get("use_proxy_proto", False)
    70            match_proto = lookup(chain, "/filter_chain_match/transport_protocol")
    71            match_domains = lookup(chain, "/filter_chain_match/server_names")
    72
    73            match_domain_str = "*"
    74
    75            if match_domains:
    76                match_domain_str = "/".join(match_domains)
    77
    78            chain_options = []
    79
    80            if match_proto == "tls":
    81                chain_options.append("TLS-only")
    82
    83            ctx = lookup(chain, "/tls_context/common_tls_context/tls_certificates/0") or None
    84            ctx_name = None
    85
    86            if ctx:
    87                for el in ctx.values():
    88                    fname = el.get("filename")
    89
    90                    if fname:
    91                        m = reSecret.match(fname)
    92
    93                        if m:
    94                            secret_name = m.group(2)
    95                            secret_namespace = m.group(1)
    96
    97                            ctx_name = f"{secret_name}.{secret_namespace}"
    98                            break
    99
   100            if ctx_name:
   101                chain_options.append(f"ctx {ctx_name}")
   102
   103            if upp:
   104                chain_options.append("with PROXY proto")
   105
   106            chain_option_str = ""
   107
   108            if chain_options:
   109                chain_option_str = f" ({', '.join(chain_options)})"
   110
   111            print(f"... CHAIN {match_domain_str}{chain_option_str}")
   112
   113            filters = chain["filters"]
   114
   115            for filter in filters:
   116                if filter["name"] == "envoy.http_connection_manager":
   117                    vhosts = lookup(filter, "/config/route_config/virtual_hosts") or []
   118
   119                    for vhost in vhosts:
   120                        domains = vhost["domains"]
   121                        routes = vhost["routes"]
   122
   123                        print(f"... ... VHOST {', '.join(domains)}")
   124
   125                        actions: dict = OrderedDict()
   126
   127                        for route in routes:
   128                            match = route["match"]
   129                            pfx = match.get("prefix") or "-no-prefix-"
   130                            headers = match.get("headers") or []
   131                            security = "insecure"
   132                            authority = ""
   133                            header_count = 0
   134                            action = "Unknown"
   135
   136                            if headers:
   137                                for hdr in headers:
   138                                    if (hdr["name"] == "x-forwarded-proto") and (
   139                                        hdr.get("exact_match") == "https"
   140                                    ):
   141                                        security = "secure"
   142                                    elif hdr["name"] == ":authority":
   143                                        authority = f"@{hdr['exact_match']}"
   144                                    else:
   145                                        header_count += 1
   146
   147                            hdr_str = ""
   148
   149                            if header_count > 0:
   150                                hdr_str = f" ({header_count} additional header{'' if header_count == 1 else 's'})"
   151
   152                            target = f"{pfx}{authority}{hdr_str}"
   153
   154                            action_list = actions.setdefault(target, [])
   155
   156                            if route.get("route"):
   157                                action = "Route"
   158                            elif route.get("redirect"):
   159                                action = "Redirect"
   160
   161                            action_list.append(f"{action} {security}")
   162
   163                        for target, action_list in actions.items():
   164                            print(f"... ... ... {target}: {'; '.join(action_list)}")
   165
   166
   167if __name__ == "__main__":
   168    main()

View as plain text