...

Text file src/github.com/datawire/ambassador/v2/python/watch_hook.py

Documentation: github.com/datawire/ambassador/v2/python

     1#!/usr/bin/python
     2
     3import logging
     4import os
     5import sys
     6from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
     7
     8from ambassador import IR, Config
     9from ambassador.fetch import ResourceFetcher
    10from ambassador.utils import ParsedService as Service
    11from ambassador.utils import SavedSecret, SecretHandler, SecretInfo, dump_json
    12
    13if TYPE_CHECKING:
    14    from ambassador.ir.irresource import IRResource  # pragma: no cover
    15
    16# default AES's Secret name
    17# (by default, we assume it will be in the same namespace as Ambassador)
    18DEFAULT_AES_SECRET_NAME = "ambassador-edge-stack"
    19
    20# the name of some env vars that can be used for overriding
    21# the AES's Secret name/namespace
    22ENV_AES_SECRET_NAME = "AMBASSADOR_AES_SECRET_NAME"
    23ENV_AES_SECRET_NAMESPACE = "AMBASSADOR_AES_SECRET_NAMESPACE"
    24
    25# the name of some env vars that can be used for overriding
    26# the Cloud Connect Token resource name/namespace
    27ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAME = "AGENT_CONFIG_RESOURCE_NAME"
    28ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAMESPACE = "AGENT_NAMESPACE"
    29DEFAULT_CLOUD_CONNECT_TOKEN_RESOURCE_NAME = "ambassador-agent-cloud-token"
    30
    31# Fake SecretHandler for our fake IR, below.
    32
    33
    34class SecretRecorder(SecretHandler):
    35    def __init__(self, logger: logging.Logger) -> None:
    36        super().__init__(logger, "-source_root-", "-cache_dir-", "0")
    37        self.needed: Dict[Tuple[str, str], SecretInfo] = {}
    38
    39    # Record what was requested, and always return success.
    40    def load_secret(
    41        self, resource: "IRResource", secret_name: str, namespace: str
    42    ) -> Optional[SecretInfo]:
    43        self.logger.debug(
    44            "SecretRecorder (%s %s): load secret %s in namespace %s"
    45            % (resource.kind, resource.name, secret_name, namespace)
    46        )
    47
    48        return self.record_secret(secret_name, namespace)
    49
    50    def record_secret(self, secret_name: str, namespace: str) -> Optional[SecretInfo]:
    51        secret_key = (secret_name, namespace)
    52
    53        if secret_key not in self.needed:
    54            self.needed[secret_key] = SecretInfo(
    55                secret_name, namespace, "needed-secret", "-crt-", "-key-", decode_b64=False
    56            )
    57        return self.needed[secret_key]
    58
    59    # Secrets that're still needed also get recorded.
    60    def still_needed(self, resource: "IRResource", secret_name: str, namespace: str) -> None:
    61        self.logger.debug(
    62            "SecretRecorder (%s %s): secret %s in namespace %s is still needed"
    63            % (resource.kind, resource.name, secret_name, namespace)
    64        )
    65
    66        self.record_secret(secret_name, namespace)
    67
    68    # Never cache anything.
    69    def cache_secret(self, resource: "IRResource", secret_info: SecretInfo):
    70        self.logger.debug(
    71            "SecretRecorder (%s %s): skipping cache step for secret %s in namespace %s"
    72            % (resource.kind, resource.name, secret_info.name, secret_info.namespace)
    73        )
    74
    75        return SavedSecret(
    76            secret_info.name,
    77            secret_info.namespace,
    78            "-crt-path-",
    79            "-key-path-",
    80            "-user-path-",
    81            "-root-crt-path",
    82            {"tls.crt": "-crt-", "tls.key": "-key-", "user.key": "-user-"},
    83        )
    84
    85
    86# XXX Sooooo there's some ugly stuff here.
    87#
    88# We need to do a little bit of the same work that the IR does for things like
    89# managing Resolvers and parsing service names. However, we really don't want to
    90# do all the work of instantiating an IR.
    91#
    92# The solution here is to subclass the IR and take advantage of the watch_only
    93# initialization keyword, which skips the hard parts of building an IR.
    94
    95
    96class FakeIR(IR):
    97    def __init__(self, aconf: Config, logger=None) -> None:
    98        # If we're asked about a secret, record interest in that secret.
    99        self.secret_recorder = SecretRecorder(logger)
   100
   101        # If we're asked about a file, it's good.
   102        file_checker = lambda path: True
   103
   104        super().__init__(
   105            aconf,
   106            logger=logger,
   107            watch_only=True,
   108            secret_handler=self.secret_recorder,
   109            file_checker=file_checker,
   110        )
   111
   112    # Don't bother actually saving resources that come up when working with
   113    # the faked modules.
   114    def save_resource(self, resource: "IRResource") -> "IRResource":
   115        return resource
   116
   117
   118class WatchHook:
   119    def __init__(self, logger, yaml_stream) -> None:
   120        # Watch management
   121
   122        self.logger = logger
   123
   124        self.consul_watches: List[Dict[str, str]] = []
   125        self.kube_watches: List[Dict[str, str]] = []
   126
   127        self.load_yaml(yaml_stream)
   128
   129    def add_kube_watch(
   130        self,
   131        what: str,
   132        kind: str,
   133        namespace: Optional[str],
   134        field_selector: Optional[str] = None,
   135        label_selector: Optional[str] = None,
   136    ) -> None:
   137        watch = {"kind": kind}
   138
   139        if namespace:
   140            watch["namespace"] = namespace
   141
   142        if field_selector:
   143            watch["field-selector"] = field_selector
   144
   145        if label_selector:
   146            watch["label-selector"] = label_selector
   147
   148        self.logger.debug(f"{what}: add watch {watch}")
   149        self.kube_watches.append(watch)
   150
   151    def load_yaml(self, yaml_stream):
   152        self.aconf = Config()
   153
   154        fetcher = ResourceFetcher(self.logger, self.aconf, watch_only=True)
   155        fetcher.parse_watt(yaml_stream.read())
   156
   157        self.aconf.load_all(fetcher.sorted())
   158
   159        # We can lift mappings straight from the aconf...
   160        mappings = self.aconf.get_config("mappings") or {}
   161
   162        # ...but we need the fake IR to deal with resolvers and TLS contexts.
   163        self.fake = FakeIR(self.aconf, logger=self.logger)
   164
   165        self.logger.debug("IR: %s" % self.fake.as_json())
   166
   167        resolvers = self.fake.resolvers
   168        contexts = self.fake.tls_contexts
   169
   170        self.logger.debug(f"mappings: {len(mappings)}")
   171        self.logger.debug(f"resolvers: {len(resolvers)}")
   172        self.logger.debug(f"contexts: {len(contexts)}")
   173
   174        global_resolver = self.fake.ambassador_module.get("resolver", None)
   175
   176        global_label_selector = os.environ.get("AMBASSADOR_LABEL_SELECTOR", "")
   177        self.logger.debug("label-selector: %s" % global_label_selector)
   178
   179        cloud_connect_token_resource_name = os.getenv(
   180            ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAME, DEFAULT_CLOUD_CONNECT_TOKEN_RESOURCE_NAME
   181        )
   182        cloud_connect_token_resource_namespace = os.getenv(
   183            ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAMESPACE, Config.ambassador_namespace
   184        )
   185        self.logger.debug(
   186            f"cloud-connect-token: need configmap/secret {cloud_connect_token_resource_name}.{cloud_connect_token_resource_namespace}"
   187        )
   188        self.add_kube_watch(
   189            f"ConfigMap {cloud_connect_token_resource_name}",
   190            "configmap",
   191            namespace=cloud_connect_token_resource_namespace,
   192            field_selector=f"metadata.name={cloud_connect_token_resource_name}",
   193        )
   194        self.add_kube_watch(
   195            f"Secret {cloud_connect_token_resource_name}",
   196            "secret",
   197            namespace=cloud_connect_token_resource_namespace,
   198            field_selector=f"metadata.name={cloud_connect_token_resource_name}",
   199        )
   200
   201        # watch the AES Secret if the edge stack is running
   202        if self.fake.edge_stack_allowed:
   203            aes_secret_name = os.getenv(ENV_AES_SECRET_NAME, DEFAULT_AES_SECRET_NAME)
   204            aes_secret_namespace = os.getenv(ENV_AES_SECRET_NAMESPACE, Config.ambassador_namespace)
   205            self.logger.debug(
   206                f"edge stack detected: need secret {aes_secret_name}.{aes_secret_namespace}"
   207            )
   208            self.add_kube_watch(
   209                f"Secret {aes_secret_name}",
   210                "secret",
   211                namespace=aes_secret_namespace,
   212                field_selector=f"metadata.name={aes_secret_name}",
   213            )
   214
   215        # Walk hosts.
   216        for host in self.fake.get_hosts():
   217            sel = host.get("selector") or {}
   218            match_labels = sel.get("matchLabels") or {}
   219
   220            label_selectors: List[str] = []
   221
   222            if global_label_selector:
   223                label_selectors.append(global_label_selector)
   224
   225            if match_labels:
   226                label_selectors += [f"{l}={v}" for l, v in match_labels.items()]
   227
   228            label_selector = ",".join(label_selectors) if label_selectors else None
   229
   230            for wanted_kind in ["service", "secret"]:
   231                self.add_kube_watch(
   232                    f"Host {host.name}", wanted_kind, host.namespace, label_selector=label_selector
   233                )
   234
   235        for mname, mapping in mappings.items():
   236            res_name = mapping.get("resolver", None)
   237            res_source = "mapping"
   238
   239            if not res_name:
   240                res_name = global_resolver
   241                res_source = "defaults"
   242
   243            ctx_name = mapping.get("tls", None)
   244
   245            self.logger.debug(
   246                f"Mapping {mname}: resolver {res_name} from {res_source}, service {mapping.service}, tls {ctx_name}"
   247            )
   248
   249            if res_name:
   250                resolver = resolvers.get(res_name, None)
   251                self.logger.debug(f"-> resolver {resolver}")
   252
   253                if resolver:
   254                    svc = Service(logger, mapping.service, ctx_name)
   255
   256                    if resolver.kind == "ConsulResolver":
   257                        self.logger.debug(f"Mapping {mname} uses Consul resolver {res_name}")
   258
   259                        # At the moment, we stuff the resolver's datacenter into the association
   260                        # ID for this watch. The ResourceFetcher relies on that.
   261
   262                        self.consul_watches.append(
   263                            {
   264                                "id": resolver.datacenter,
   265                                "consul-address": resolver.address,
   266                                "datacenter": resolver.datacenter,
   267                                "service-name": svc.hostname,
   268                            }
   269                        )
   270                    elif resolver.kind == "KubernetesEndpointResolver":
   271                        host = svc.hostname
   272                        namespace = Config.ambassador_namespace
   273
   274                        if not host:
   275                            # This is really kind of impossible.
   276                            self.logger.error(
   277                                f"KubernetesEndpointResolver {res_name} has no 'hostname'"
   278                            )
   279                            continue
   280
   281                        if "." in host:
   282                            (host, namespace) = host.split(".", 2)[0:2]
   283
   284                        self.logger.debug(
   285                            f"...kube endpoints: svc {svc.hostname} -> host {host} namespace {namespace}"
   286                        )
   287
   288                        self.add_kube_watch(
   289                            f"endpoint",
   290                            "endpoints",
   291                            namespace,
   292                            label_selector=global_label_selector,
   293                            field_selector=f"metadata.name={host}",
   294                        )
   295
   296        for secret_key, secret_info in self.fake.secret_recorder.needed.items():
   297            self.logger.debug(f"need secret {secret_info.name}.{secret_info.namespace}")
   298
   299            self.add_kube_watch(
   300                f"needed secret",
   301                "secret",
   302                secret_info.namespace,
   303                label_selector=global_label_selector,
   304                field_selector=f"metadata.name={secret_info.name}",
   305            )
   306
   307        if self.fake.edge_stack_allowed:
   308            # If the edge stack is allowed, make sure we watch for our fallback context.
   309            self.add_kube_watch(
   310                "Fallback TLSContext", "TLSContext", namespace=Config.ambassador_namespace
   311            )
   312
   313        ambassador_basedir = os.environ.get("AMBASSADOR_CONFIG_BASE_DIR", "/ambassador")
   314
   315        if os.path.exists(os.path.join(ambassador_basedir, ".ambassadorinstallations_ok")):
   316            self.add_kube_watch(
   317                "AmbassadorInstallations",
   318                "ambassadorinstallations.getambassador.io",
   319                Config.ambassador_namespace,
   320            )
   321
   322        ambassador_knative_requested = (
   323            os.environ.get("AMBASSADOR_KNATIVE_SUPPORT", "-unset-").lower() == "true"
   324        )
   325
   326        if ambassador_knative_requested:
   327            self.logger.debug("Looking for Knative support...")
   328
   329            if os.path.exists(os.path.join(ambassador_basedir, ".knative_clusteringress_ok")):
   330                # Watch for clusteringresses.networking.internal.knative.dev in any namespace and with any labels.
   331
   332                self.logger.debug("watching for clusteringresses.networking.internal.knative.dev")
   333                self.add_kube_watch(
   334                    "Knative clusteringresses",
   335                    "clusteringresses.networking.internal.knative.dev",
   336                    None,
   337                )
   338
   339            if os.path.exists(os.path.join(ambassador_basedir, ".knative_ingress_ok")):
   340                # Watch for ingresses.networking.internal.knative.dev in any namespace and
   341                # with any labels.
   342
   343                self.add_kube_watch(
   344                    "Knative ingresses", "ingresses.networking.internal.knative.dev", None
   345                )
   346
   347        self.watchset = {
   348            "kubernetes-watches": self.kube_watches,
   349            "consul-watches": self.consul_watches,
   350        }
   351
   352        save_dir = os.environ.get("AMBASSADOR_WATCH_DIR", "/tmp")
   353
   354        if save_dir:
   355            watchset = dump_json(self.watchset)
   356            with open(os.path.join(save_dir, "watch.json"), "w") as output:
   357                output.write(watchset)
   358
   359
   360#### Mainline.
   361
   362if __name__ == "__main__":
   363    loglevel = logging.INFO
   364
   365    args = sys.argv[1:]
   366
   367    if args:
   368        if args[0] == "--debug":
   369            loglevel = logging.DEBUG
   370            args.pop(0)
   371        elif args[0].startswith("--"):
   372            raise Exception(f"Usage: {os.path.basename(sys.argv[0])} [--debug] [path]")
   373
   374    logging.basicConfig(
   375        level=loglevel,
   376        format="%(asctime)s watch-hook %(levelname)s: %(message)s",
   377        datefmt="%Y-%m-%d %H:%M:%S",
   378    )
   379
   380    alogger = logging.getLogger("ambassador")
   381    alogger.setLevel(logging.INFO)
   382
   383    logger = logging.getLogger("watch_hook")
   384    logger.setLevel(loglevel)
   385
   386    yaml_stream = sys.stdin
   387
   388    if args:
   389        yaml_stream = open(args[0], "r")
   390
   391    wh = WatchHook(logger, yaml_stream)
   392
   393    watchset = dump_json(wh.watchset)
   394    sys.stdout.write(watchset)

View as plain text