...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador_diag/diagd.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador_diag

     1#!python
     2
     3# Copyright 2018 Datawire. All rights reserved.
     4#
     5# Licensed under the Apache License, Version 2.0 (the "License");
     6# you may not use this file except in compliance with the License.
     7# You may obtain a copy of the License at
     8#
     9#     http://www.apache.org/licenses/LICENSE-2.0
    10#
    11# Unless required by applicable law or agreed to in writing, software
    12# distributed under the License is distributed on an "AS IS" BASIS,
    13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14# See the License for the specific language governing permissions and
    15# limitations under the License
    16import concurrent.futures
    17import copy
    18import datetime
    19import difflib
    20import functools
    21import json
    22import logging
    23import multiprocessing
    24import os
    25import queue
    26import re
    27import signal
    28import subprocess
    29import sys
    30import threading
    31import time
    32import traceback
    33import uuid
    34from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union
    35from typing import cast as typecast
    36
    37import click
    38import gunicorn.app.base
    39import jsonpatch
    40import requests
    41from expiringdict import ExpiringDict
    42from flask import Flask, Response
    43from flask import json as flask_json
    44from flask import jsonify, render_template, request, send_from_directory
    45from pkg_resources import Requirement, resource_filename
    46from prometheus_client import CollectorRegistry, Gauge, Info, ProcessCollector, generate_latest
    47from pythonjsonlogger import jsonlogger
    48from typing_extensions import NotRequired, TypedDict
    49
    50from ambassador import IR, Cache, Config, Diagnostics, EnvoyConfig, Scout, Version
    51from ambassador.ambscout import LocalScout
    52from ambassador.constants import Constants
    53from ambassador.diagnostics import EnvoyStats, EnvoyStatsMgr
    54from ambassador.fetch import ResourceFetcher
    55from ambassador.ir.irambassador import IRAmbassador
    56from ambassador.reconfig_stats import ReconfigStats
    57from ambassador.utils import (
    58    FSSecretHandler,
    59    PeriodicTrigger,
    60    SecretHandler,
    61    SystemInfo,
    62    Timer,
    63    dump_json,
    64    load_url_contents,
    65    parse_bool,
    66    parse_json,
    67)
    68
    69if TYPE_CHECKING:
    70    from ambassador.ir.irtlscontext import IRTLSContext  # pragma: no cover
    71
    72__version__ = Version
    73
    74boot_time = datetime.datetime.now()
    75
    76# allows 10 concurrent users, with a request timeout of 60 seconds
    77tvars_cache = ExpiringDict(max_len=10, max_age_seconds=60)
    78
    79logHandler = None
    80if parse_bool(os.environ.get("AMBASSADOR_JSON_LOGGING", "false")):
    81    jsonFormatter = jsonlogger.JsonFormatter(
    82        "%%(asctime)s %%(filename)s %%(lineno)d %%(process)d (threadName)s %%(levelname)s %%(message)s"
    83    )
    84    logHandler = logging.StreamHandler()
    85    logHandler.setFormatter(jsonFormatter)
    86
    87    # Set the root logger to INFO level and tell it to use the new log handler.
    88    logger = logging.getLogger()
    89    logger.setLevel(logging.INFO)
    90    logger.addHandler(logHandler)
    91
    92    # Update all of the other loggers to also use the new log handler.
    93    loggingManager = getattr(logging.root, "manager", None)
    94    if loggingManager is not None:
    95        for name in loggingManager.loggerDict:
    96            logging.getLogger(name).addHandler(logHandler)
    97    else:
    98        print("Could not find a logging manager. Some logging may not be properly JSON formatted!")
    99else:
   100    # Default log level
   101    level = logging.INFO
   102
   103    # Check for env var log level
   104    if level_name := os.getenv("AES_LOG_LEVEL"):
   105        level_number = logging.getLevelName(level_name.upper())
   106
   107        if isinstance(level_number, int):
   108            level = level_number
   109
   110    # Set defauts for all loggers
   111    logging.basicConfig(
   112        level=level,
   113        format="%%(asctime)s diagd %s [P%%(process)dT%%(threadName)s] %%(levelname)s: %%(message)s"
   114        % __version__,
   115        datefmt="%Y-%m-%d %H:%M:%S",
   116    )
   117
   118# Shut up Werkzeug's standard request logs -- they're just too noisy.
   119logging.getLogger("werkzeug").setLevel(logging.CRITICAL)
   120
   121# Likewise make requests a bit quieter.
   122logging.getLogger("urllib3").setLevel(logging.WARNING)
   123logging.getLogger("requests").setLevel(logging.WARNING)
   124
   125ambassador_targets = {
   126    "mapping": "https://www.getambassador.io/reference/configuration#mappings",
   127    "module": "https://www.getambassador.io/reference/configuration#modules",
   128}
   129
   130# envoy_targets = {
   131#     'route': 'https://envoyproxy.github.io/envoy/configuration/http_conn_man/route_config/route.html',
   132#     'cluster': 'https://envoyproxy.github.io/envoy/configuration/cluster_manager/cluster.html',
   133# }
   134
   135
   136def number_of_workers():
   137    return (multiprocessing.cpu_count() * 2) + 1
   138
   139
   140class DiagApp(Flask):
   141    cache: Optional[Cache]
   142    ambex_pid: int
   143    kick: Optional[str]
   144    estatsmgr: EnvoyStatsMgr
   145    config_path: Optional[str]
   146    snapshot_path: str
   147    bootstrap_path: str
   148    ads_path: str
   149    clustermap_path: str
   150    health_checks: bool
   151    no_envoy: bool
   152    debugging: bool
   153    allow_fs_commands: bool
   154    report_action_keys: bool
   155    verbose: bool
   156    notice_path: str
   157    logger: logging.Logger  # type: ignore # FIXME(lukeshu): Mypy's probably right on this one, but it'd be a lot of work to fix
   158    aconf: Config
   159    ir: Optional[IR]
   160    econf: Optional[EnvoyConfig]
   161    # self.diag is actually a property
   162    _diag: Optional[Diagnostics]
   163    notices: "Notices"
   164    scout: Scout
   165    watcher: "AmbassadorEventWatcher"
   166    stats_updater: Optional[PeriodicTrigger]
   167    scout_checker: Optional[PeriodicTrigger]
   168    timer_logger: Optional[PeriodicTrigger]
   169    last_request_info: Dict[str, int]
   170    last_request_time: Optional[datetime.datetime]
   171    latest_snapshot: str
   172    banner_endpoint: Optional[str]
   173    metrics_endpoint: Optional[str]
   174
   175    # Reconfiguration stats
   176    reconf_stats: ReconfigStats
   177
   178    # Custom metrics registry to weed-out default metrics collectors because the
   179    # default collectors can't be prefixed/namespaced with ambassador_.
   180    # Using the default metrics collectors would lead to name clashes between the Python and Go instrumentations.
   181    metrics_registry: CollectorRegistry
   182
   183    config_lock: threading.Lock
   184    diag_lock: threading.Lock
   185
   186    def setup(
   187        self,
   188        snapshot_path: str,
   189        bootstrap_path: str,
   190        ads_path: str,
   191        config_path: Optional[str],
   192        ambex_pid: int,
   193        kick: Optional[str],
   194        banner_endpoint: Optional[str],
   195        metrics_endpoint: Optional[str],
   196        k8s=False,
   197        do_checks=True,
   198        no_envoy=False,
   199        reload=False,
   200        debug=False,
   201        verbose=False,
   202        notices=None,
   203        validation_retries=5,
   204        allow_fs_commands=False,
   205        local_scout=False,
   206        report_action_keys=False,
   207        enable_fast_reconfigure=False,
   208        clustermap_path=None,
   209    ):
   210        self.health_checks = do_checks
   211        self.no_envoy = no_envoy
   212        self.debugging = reload
   213        self.verbose = verbose
   214        self.notice_path = notices
   215        self.notices = Notices(self.notice_path)
   216        self.notices.reset()
   217        self.k8s = k8s
   218        self.validation_retries = validation_retries
   219        self.allow_fs_commands = allow_fs_commands
   220        self.local_scout = local_scout
   221        self.report_action_keys = report_action_keys
   222        self.banner_endpoint = banner_endpoint
   223        self.metrics_endpoint = metrics_endpoint
   224        self.metrics_registry = CollectorRegistry(auto_describe=True)
   225        self.enable_fast_reconfigure = enable_fast_reconfigure
   226
   227        # Init logger, inherits settings from default
   228        self.logger = logging.getLogger("ambassador.diagd")
   229
   230        # Initialize the Envoy stats manager...
   231        self.estatsmgr = EnvoyStatsMgr(self.logger)
   232
   233        # ...and the incremental-reconfigure stats.
   234        self.reconf_stats = ReconfigStats(self.logger)
   235
   236        # This will raise an exception and crash if you pass it a string. That's intentional.
   237        self.ambex_pid = int(ambex_pid)
   238        self.kick = kick
   239
   240        # Initialize the cache if we're allowed to.
   241        if self.enable_fast_reconfigure:
   242            self.logger.info("AMBASSADOR_FAST_RECONFIGURE enabled, initializing cache")
   243            self.cache = Cache(self.logger)
   244        else:
   245            self.logger.info("AMBASSADOR_FAST_RECONFIGURE disabled, not initializing cache")
   246            self.cache = None
   247
   248        # Use Timers to keep some stats on reconfigurations
   249        self.config_timer = Timer("reconfiguration", self.metrics_registry)
   250        self.fetcher_timer = Timer("Fetcher", self.metrics_registry)
   251        self.aconf_timer = Timer("AConf", self.metrics_registry)
   252        self.ir_timer = Timer("IR", self.metrics_registry)
   253        self.econf_timer = Timer("EConf", self.metrics_registry)
   254        self.diag_timer = Timer("Diagnostics", self.metrics_registry)
   255
   256        # Use gauges to keep some metrics on active config
   257        self.diag_errors = Gauge(
   258            f"diagnostics_errors",
   259            f"Number of configuration errors",
   260            namespace="ambassador",
   261            registry=self.metrics_registry,
   262        )
   263        self.diag_notices = Gauge(
   264            f"diagnostics_notices",
   265            f"Number of configuration notices",
   266            namespace="ambassador",
   267            registry=self.metrics_registry,
   268        )
   269        self.diag_log_level = Gauge(
   270            f"log_level",
   271            f"Debug log level enabled or not",
   272            ["level"],
   273            namespace="ambassador",
   274            registry=self.metrics_registry,
   275        )
   276
   277        if debug:
   278            self.logger.setLevel(logging.DEBUG)
   279            self.diag_log_level.labels("debug").set(1)
   280            logging.getLogger("ambassador").setLevel(logging.DEBUG)
   281        else:
   282            self.diag_log_level.labels("debug").set(0)
   283
   284        # Assume that we will NOT update Mapping status.
   285        ksclass: Type[KubeStatus] = KubeStatusNoMappings
   286
   287        if os.environ.get("AMBASSADOR_UPDATE_MAPPING_STATUS", "false").lower() == "true":
   288            self.logger.info("WILL update Mapping status")
   289            ksclass = KubeStatus
   290        else:
   291            self.logger.info("WILL NOT update Mapping status")
   292
   293        self.kubestatus = ksclass(self)
   294
   295        self.config_path = config_path
   296        self.bootstrap_path = bootstrap_path
   297        self.ads_path = ads_path
   298        self.snapshot_path = snapshot_path
   299        self.clustermap_path = clustermap_path or os.path.join(
   300            os.path.dirname(self.bootstrap_path), "clustermap.json"
   301        )
   302
   303        # You must hold config_lock when updating config elements (including diag!).
   304        self.config_lock = threading.Lock()
   305
   306        # When generating new diagnostics, there's a dance around config_lock and
   307        # diag_lock -- see the diag() property.
   308        self.diag_lock = threading.Lock()
   309
   310        # Why are we doing this? Aren't we sure we're singlethreaded here?
   311        # Well, yes. But self.diag is actually a property, and it will raise an
   312        # assertion failure if we're not holding self.config_lock... and once
   313        # the lock is in play at all, we're gonna time it, in case my belief
   314        # that grabbing the lock here is always effectively free turns out to
   315        # be wrong.
   316
   317        with self.config_lock:
   318            self.ir = None  # don't update unless you hold config_lock
   319            self.econf = None  # don't update unless you hold config_lock
   320            self.diag = None  # don't update unless you hold config_lock
   321
   322        self.stats_updater = None
   323        self.scout_checker = None
   324
   325        self.last_request_info = {}
   326        self.last_request_time = None
   327
   328        # self.scout = Scout(update_frequency=datetime.timedelta(seconds=10))
   329        self.scout = Scout(local_only=self.local_scout)
   330
   331        ProcessCollector(namespace="ambassador", registry=self.metrics_registry)
   332        metrics_info = Info(
   333            name="diagnostics",
   334            namespace="ambassador",
   335            documentation="Ambassador diagnostic info",
   336            registry=self.metrics_registry,
   337        )
   338        metrics_info.info(
   339            {
   340                "version": __version__,
   341                "ambassador_id": Config.ambassador_id,
   342                "cluster_id": os.environ.get(
   343                    "AMBASSADOR_CLUSTER_ID",
   344                    os.environ.get("AMBASSADOR_SCOUT_ID", "00000000-0000-0000-0000-000000000000"),
   345                ),
   346                "single_namespace": str(Config.single_namespace),
   347            }
   348        )
   349
   350    @property
   351    def diag(self) -> Optional[Diagnostics]:
   352        """
   353        It turns out to be expensive to generate the Diagnostics class, so
   354        app.diag is a property that does it on demand, handling Timers and
   355        the config lock for you.
   356
   357        You MUST NOT already hold the config_lock or the diag_lock when
   358        trying to read app.diag.
   359
   360        You MUST already have loaded an IR.
   361        """
   362
   363        # The config_lock is meant to make sure that we don't ever update
   364        # self.diag in two places at once, so grab that first.
   365        with self.config_lock:
   366            # If we've already generated diagnostics...
   367            if app._diag:
   368                # ...then we're good to go.
   369                return app._diag
   370
   371        # If here, we have _not_ generated diagnostics, and we've dropped the
   372        # config lock so as not to block anyone else. Next up: grab the diag
   373        # lock, because we'd rather not have two diag generations happening at
   374        # once.
   375        with self.diag_lock:
   376            # Did someone else sneak in between releasing the config lock and
   377            # grabbing the diag lock?
   378            if app._diag:
   379                # Yup. Use their work.
   380                return app._diag
   381
   382            # OK, go generate diagnostics.
   383            _diag = self._generate_diagnostics()
   384
   385            # If that didn't work, no point in messing with the config lock.
   386            if not _diag:
   387                return None
   388
   389            # Once here, we need to - once again - grab the config lock to update
   390            # app._diag. This is safe because this is the only place we ever mess
   391            # with the diag lock, so nowhere else will try to grab the diag lock
   392            # while holding the config lock.
   393            with app.config_lock:
   394                app._diag = _diag
   395
   396            # Finally, we can return app._diag to our caller.
   397            return app._diag
   398
   399    @diag.setter
   400    def diag(self, diag: Optional[Diagnostics]) -> None:
   401        """
   402        It turns out to be expensive to generate the Diagnostics class, so
   403        app.diag is a property that does it on demand, handling Timers and
   404        the config lock for you.
   405
   406        You MUST already hold the config_lock when trying to update app.diag.
   407        You MUST NOT hold the diag_lock.
   408        """
   409        self._diag = diag
   410
   411    def _generate_diagnostics(self) -> Optional[Diagnostics]:
   412        """
   413        Do the heavy lifting of generating Diagnostics for our current configuration.
   414        Really, only the diag() property should be calling this method, but if you
   415        are convinced that you need to call it from elsewhere:
   416
   417        1. You're almost certainly wrong about needing to call it from elsewhere.
   418        2. You MUST hold the diag_lock when calling this method.
   419        3. You MUST NOT hold the config_lock when calling this method.
   420        4. No, really, you're wrong. Don't call this method from anywhere but the
   421           diag() property.
   422        """
   423
   424        # Make sure we have an IR and econf to work with.
   425        if not app.ir or not app.econf:
   426            # Nope, bail.
   427            return None
   428
   429        # OK, go ahead and generate diagnostics. Use the diag_timer to time
   430        # this.
   431        with self.diag_timer:
   432            _diag = Diagnostics(app.ir, app.econf)
   433
   434            # Update some metrics data points given the new generated Diagnostics
   435            diag_dict = _diag.as_dict()
   436            self.diag_errors.set(len(diag_dict.get("errors", [])))
   437            self.diag_notices.set(len(diag_dict.get("notices", [])))
   438
   439            # Note that we've updated diagnostics, since that might trigger a
   440            # timer log.
   441            self.reconf_stats.mark("diag")
   442
   443            return _diag
   444
   445    def check_scout(self, what: str) -> None:
   446        self.watcher.post("SCOUT", (what, self.ir))
   447
   448    def post_timer_event(self) -> None:
   449        # Post an event to do a timer check.
   450        self.watcher.post("TIMER", None)
   451
   452    def check_timers(self) -> None:
   453        # Actually do the timer check.
   454
   455        if self.reconf_stats.needs_timers():
   456            # OK! Log the timers...
   457
   458            for t in [
   459                self.config_timer,
   460                self.fetcher_timer,
   461                self.aconf_timer,
   462                self.ir_timer,
   463                self.econf_timer,
   464                self.diag_timer,
   465            ]:
   466                if t:
   467                    self.logger.info(t.summary())
   468
   469            # ...and the cache statistics, if we can.
   470            if self.cache:
   471                self.cache.dump_stats()
   472
   473            # Always dump the reconfiguration stats...
   474            self.reconf_stats.dump()
   475
   476            # ...and mark that the timers have been logged.
   477            self.reconf_stats.mark_timers_logged()
   478
   479        # In this case we need to check to see if it's time to do a configuration
   480        # check, too.
   481        if self.reconf_stats.needs_check():
   482            result = False
   483
   484            try:
   485                result = self.check_cache()
   486            except Exception as e:
   487                tb = "\n".join(traceback.format_exception(*sys.exc_info()))
   488                self.logger.error("CACHE: CHECK FAILED: %s\n%s" % (e, tb))
   489
   490            # Mark that the check has happened.
   491            self.reconf_stats.mark_checked(result)
   492
   493    @staticmethod
   494    def json_diff(what: str, j1: str, j2: str) -> str:
   495        output = ""
   496
   497        l1 = j1.split("\n")
   498        l2 = j2.split("\n")
   499
   500        n1 = f"{what} incremental"
   501        n2 = f"{what} nonincremental"
   502
   503        output += "\n--------\n"
   504
   505        for line in difflib.context_diff(l1, l2, fromfile=n1, tofile=n2):
   506            line = line.rstrip()
   507            output += line
   508            output += "\n"
   509
   510        return output
   511
   512    def check_cache(self) -> bool:
   513        # We're going to build a shiny new IR and econf from our existing aconf, and make
   514        # sure everything matches. We will _not_ use the existing cache for this.
   515        #
   516        # For this, make sure we have an IR already...
   517        assert self.aconf
   518        assert self.ir
   519        assert self.econf
   520
   521        # Compute this IR/econf with a new empty cache. It saves a lot of trouble with
   522        # having to delete cache keys from the JSON.
   523
   524        self.logger.debug("CACHE: starting check")
   525        cache = Cache(self.logger)
   526        scc = SecretHandler(app.logger, "check_cache", app.snapshot_path, "check")
   527        ir = IR(self.aconf, secret_handler=scc, cache=cache)
   528        econf = EnvoyConfig.generate(ir, cache=cache)
   529
   530        # This is testing code.
   531        # name = list(ir.clusters.keys())[0]
   532        # del(ir.clusters[name])
   533
   534        i1 = self.ir.as_json()
   535        i2 = ir.as_json()
   536
   537        e1 = self.econf.as_json()
   538        e2 = econf.as_json()
   539
   540        result = True
   541        errors = ""
   542
   543        if i1 != i2:
   544            result = False
   545            self.logger.error("CACHE: IR MISMATCH")
   546            errors += "IR diffs:\n"
   547            errors += self.json_diff("IR", i1, i2)
   548
   549        if e1 != e2:
   550            result = False
   551            self.logger.error("CACHE: ENVOY CONFIG MISMATCH")
   552            errors += "econf diffs:\n"
   553            errors += self.json_diff("econf", i1, i2)
   554
   555        if not result:
   556            err_path = os.path.join(self.snapshot_path, "diff-tmp.txt")
   557
   558            open(err_path, "w").write(errors)
   559
   560            snapcount = int(os.environ.get("AMBASSADOR_SNAPSHOT_COUNT", "4"))
   561            snaplist: List[Tuple[str, str]] = []
   562
   563            if snapcount > 0:
   564                # If snapcount is 4, this range statement becomes range(-4, -1)
   565                # which gives [ -4, -3, -2 ], which the list comprehension turns
   566                # into [ ( "-3", "-4" ), ( "-2", "-3" ), ( "-1", "-2" ) ]...
   567                # which is the list of suffixes to rename to rotate the snapshots.
   568
   569                snaplist += [(str(x + 1), str(x)) for x in range(-1 * snapcount, -1)]
   570
   571                # After dealing with that, we need to rotate the current file into -1.
   572                snaplist.append(("", "-1"))
   573
   574            # Whether or not we do any rotation, we need to cycle in the '-tmp' file.
   575            snaplist.append(("-tmp", ""))
   576
   577            for from_suffix, to_suffix in snaplist:
   578                from_path = os.path.join(app.snapshot_path, "diff{}.txt".format(from_suffix))
   579                to_path = os.path.join(app.snapshot_path, "diff{}.txt".format(to_suffix))
   580
   581                try:
   582                    self.logger.debug("rotate: %s -> %s" % (from_path, to_path))
   583                    os.rename(from_path, to_path)
   584                except IOError as e:
   585                    self.logger.debug("skip %s -> %s: %s" % (from_path, to_path, e))
   586                except Exception as e:
   587                    self.logger.debug("could not rename %s -> %s: %s" % (from_path, to_path, e))
   588
   589        self.logger.info("CACHE: check %s" % ("succeeded" if result else "failed"))
   590
   591        return result
   592
   593
   594# get the "templates" directory, or raise "FileNotFoundError" if not found
   595def get_templates_dir():
   596    res_dir = None
   597    try:
   598        # this will fail when not in a distribution
   599        res_dir = resource_filename(Requirement.parse("ambassador"), "templates")
   600    except:
   601        pass
   602
   603    maybe_dirs = [res_dir, os.path.join(os.path.dirname(__file__), "..", "templates")]
   604    for d in maybe_dirs:
   605        if d and os.path.isdir(d):
   606            return d
   607    raise FileNotFoundError
   608
   609
   610# Get the Flask app defined early. Setup happens later.
   611app = DiagApp(__name__, template_folder=get_templates_dir())
   612
   613
   614######## DECORATORS
   615
   616
   617def standard_handler(f):
   618    func_name = getattr(f, "__name__", "<anonymous>")
   619
   620    @functools.wraps(f)
   621    def wrapper(*args, **kwds):
   622        reqid = str(uuid.uuid4()).upper()
   623        prefix = '%s: %s "%s %s"' % (reqid, request.remote_addr, request.method, request.path)
   624
   625        app.logger.debug("%s START" % prefix)
   626
   627        start = datetime.datetime.now()
   628
   629        app.logger.debug("%s handler %s" % (prefix, func_name))
   630
   631        # Getting elements in the `tvars_cache` will make sure eviction happens on `max_age_seconds` TTL
   632        # for removed patch_client rather than waiting to fill `max_len`.
   633        # Looping over a copied list of keys, to prevent mutating tvars_cache while iterating.
   634        for k in list(tvars_cache.keys()):
   635            tvars_cache.get(k)
   636
   637        # Default to the exception case
   638        result_to_log = "server error"
   639        status_to_log = 500
   640        result_log_level = logging.ERROR
   641        result = Response(result_to_log, status_to_log)
   642
   643        try:
   644            result = f(*args, reqid=reqid, **kwds)
   645            if not isinstance(result, Response):
   646                result = Response(f"Invalid handler result {result}", status_to_log)
   647
   648            status_to_log = result.status_code
   649
   650            if (status_to_log // 100) == 2:
   651                result_log_level = logging.INFO
   652                result_to_log = "success"
   653            else:
   654                result_log_level = logging.ERROR
   655                result_to_log = "failure"
   656        except Exception as e:
   657            app.logger.exception(e)
   658
   659        end = datetime.datetime.now()
   660        ms = int(((end - start).total_seconds() * 1000) + 0.5)
   661
   662        app.logger.log(
   663            result_log_level, "%s %dms %d %s" % (prefix, ms, status_to_log, result_to_log)
   664        )
   665
   666        return result
   667
   668    return wrapper
   669
   670
   671def internal_handler(f):
   672    """
   673    Reject requests where the remote address is not localhost. See the docstring
   674    for _is_local_request() for important caveats!
   675    """
   676    func_name = getattr(f, "__name__", "<anonymous>")
   677
   678    @functools.wraps(f)
   679    def wrapper(*args, **kwds):
   680        if not _is_local_request():
   681            return "Forbidden\n", 403
   682        return f(*args, **kwds)
   683
   684    return wrapper
   685
   686
   687######## UTILITIES
   688
   689
   690def _is_local_request() -> bool:
   691    """
   692    Determine if this request originated with localhost.
   693
   694    We rely on healthcheck_server.go setting the X-Ambassador-Diag-IP header for us
   695    (and we rely on it overwriting anything that's already there!).
   696
   697    It might be possible to consider the environment variables SERVER_NAME and
   698    SERVER_PORT instead, as those are allegedly required by WSGI... but attempting
   699    to do so in Flask/GUnicorn yielded a worse implementation that was still not
   700    portable.
   701
   702    """
   703
   704    remote_addr: Optional[str] = ""
   705
   706    remote_addr = request.headers.get("X-Ambassador-Diag-IP")
   707
   708    return remote_addr == "127.0.0.1"
   709
   710
   711def _allow_diag_ui() -> bool:
   712    """
   713    Helper function to check if diag ui traffic is allowed or not
   714    based on the different flags from the config:
   715    * diagnostics.enabled: Enable to diag UI by adding mappings
   716    * diagnostics.allow_non_local: Allow non local traffic
   717                                   even when diagnotics UI is disabled.
   718                                   Mappings are not added for the diag UI
   719                                   but the diagnotics UI is still exposed for
   720                                   the pod IP in the admin port.
   721    * local traffic or not: When diagnotics disagled and allow_non_local is false,
   722                            allow traffic only from localhost clients
   723    """
   724    enabled = False
   725    allow_non_local = False
   726    ir = app.ir
   727    if ir:
   728        enabled = ir.ambassador_module.diagnostics.get("enabled", False)
   729        allow_non_local = ir.ambassador_module.diagnostics.get("allow_non_local", False)
   730    if not enabled and not _is_local_request() and not allow_non_local:
   731        return False
   732    return True
   733
   734
   735class Notices:
   736    def __init__(self, local_config_path: str) -> None:
   737        self.local_path = local_config_path
   738        self.notices: List[Dict[str, str]] = []
   739
   740    def reset(self):
   741        local_notices: List[Dict[str, str]] = []
   742        local_data = ""
   743
   744        try:
   745            local_stream = open(self.local_path, "r")
   746            local_data = local_stream.read()
   747            local_notices = parse_json(local_data)
   748        except OSError:
   749            pass
   750        except:
   751            local_notices.append(
   752                {"level": "ERROR", "message": "bad local notices: %s" % local_data}
   753            )
   754
   755        self.notices = local_notices
   756        # app.logger.info("Notices: after RESET: %s" % dump_json(self.notices))
   757
   758    def post(self, notice):
   759        # app.logger.debug("Notices: POST %s" % notice)
   760        self.notices.append(notice)
   761        # app.logger.info("Notices: after POST: %s" % dump_json(self.notices))
   762
   763    def prepend(self, notice):
   764        # app.logger.debug("Notices: PREPEND %s" % notice)
   765        self.notices.insert(0, notice)
   766        # app.logger.info("Notices: after PREPEND: %s" % dump_json(self.notices))
   767
   768    def extend(self, notices):
   769        for notice in notices:
   770            self.post(notice)
   771
   772
   773def td_format(td_object):
   774    seconds = int(td_object.total_seconds())
   775    periods = [
   776        ("year", 60 * 60 * 24 * 365),
   777        ("month", 60 * 60 * 24 * 30),
   778        ("day", 60 * 60 * 24),
   779        ("hour", 60 * 60),
   780        ("minute", 60),
   781        ("second", 1),
   782    ]
   783
   784    strings = []
   785    for period_name, period_seconds in periods:
   786        if seconds > period_seconds:
   787            period_value, seconds = divmod(seconds, period_seconds)
   788
   789            strings.append(
   790                "%d %s%s" % (period_value, period_name, "" if (period_value == 1) else "s")
   791            )
   792
   793    formatted = ", ".join(strings)
   794
   795    if not formatted:
   796        formatted = "0s"
   797
   798    return formatted
   799
   800
   801def interval_format(seconds, normal_format, now_message):
   802    if seconds >= 1:
   803        return normal_format % td_format(datetime.timedelta(seconds=seconds))
   804    else:
   805        return now_message
   806
   807
   808def system_info(app):
   809    ir = app.ir
   810    debug_mode = False
   811
   812    if ir:
   813        amod = ir.ambassador_module
   814        debug_mode = amod.get("debug_mode", False)
   815
   816        app.logger.debug(f"DEBUG_MODE {debug_mode}")
   817
   818    status_dict = {"config failure": [False, "no configuration loaded"]}
   819
   820    env_status = getattr(app.watcher, "env_status", None)
   821
   822    if env_status:
   823        status_dict = env_status.to_dict()
   824        app.logger.debug(f"status_dict {status_dict}")
   825
   826    return {
   827        "version": __version__,
   828        "hostname": SystemInfo.MyHostName,
   829        "ambassador_id": Config.ambassador_id,
   830        "ambassador_namespace": Config.ambassador_namespace,
   831        "single_namespace": Config.single_namespace,
   832        "knative_enabled": os.environ.get("AMBASSADOR_KNATIVE_SUPPORT", "").lower() == "true",
   833        "statsd_enabled": os.environ.get("STATSD_ENABLED", "").lower() == "true",
   834        "endpoints_enabled": Config.enable_endpoints,
   835        "cluster_id": os.environ.get(
   836            "AMBASSADOR_CLUSTER_ID",
   837            os.environ.get("AMBASSADOR_SCOUT_ID", "00000000-0000-0000-0000-000000000000"),
   838        ),
   839        "boot_time": boot_time,
   840        "hr_uptime": td_format(datetime.datetime.now() - boot_time),
   841        "latest_snapshot": app.latest_snapshot,
   842        "env_good": getattr(app.watcher, "env_good", False),
   843        "env_failures": getattr(app.watcher, "failure_list", ["no IR loaded"]),
   844        "env_status": status_dict,
   845        "debug_mode": debug_mode,
   846    }
   847
   848
   849def envoy_status(estats: EnvoyStats):
   850    since_boot = interval_format(estats.time_since_boot(), "%s", "less than a second")
   851
   852    since_update = "Never updated"
   853
   854    if estats.time_since_update():
   855        since_update = interval_format(
   856            estats.time_since_update(), "%s ago", "within the last second"
   857        )
   858
   859    return {
   860        "alive": estats.is_alive(),
   861        "ready": estats.is_ready(),
   862        "uptime": since_boot,
   863        "since_update": since_update,
   864    }
   865
   866
   867def drop_serializer_key(d: Dict[Any, Any]) -> Dict[Any, Any]:
   868    """
   869    Delete the "serialization" key (if present) in any dictionary passed in and
   870    return that dictionary. This function is intended to be used as the
   871    object_hook value for json.load[s].
   872    """
   873    _ = d.pop("serialization", None)
   874    return d
   875
   876
   877def filter_keys(d: Dict[Any, Any], keys_to_keep):
   878    unwanted_keys = set(d) - set(keys_to_keep)
   879    for unwanted_key in unwanted_keys:
   880        del d[unwanted_key]
   881
   882
   883def filter_webui(d: Dict[Any, Any]):
   884    filter_keys(
   885        d,
   886        [
   887            "system",
   888            "route_info",
   889            "source_map",
   890            "ambassador_resolvers",
   891            "ambassador_services",
   892            "envoy_status",
   893            "cluster_stats",
   894            "loginfo",
   895            "errors",
   896        ],
   897    )
   898    for ambassador_resolver in d["ambassador_resolvers"]:
   899        filter_keys(ambassador_resolver, ["_source", "kind"])
   900    for route_info in d["route_info"]:
   901        filter_keys(route_info, ["diag_class", "key", "headers", "precedence", "clusters"])
   902        for cluster in route_info["clusters"]:
   903            filter_keys(cluster, ["_hcolor", "type_label", "service", "weight"])
   904
   905
   906@app.route("/_internal/v0/ping", methods=["GET"])
   907@internal_handler
   908def handle_ping():
   909    return "ACK\n", 200
   910
   911
   912@app.route("/_internal/v0/features", methods=["GET"])
   913@internal_handler
   914def handle_features():
   915    # If we don't have an IR yet, do nothing.
   916    #
   917    # We don't bother grabbing the config_lock here because we're not changing
   918    # anything, and an features request hitting at exactly the same moment as
   919    # the first configure is a race anyway. If it fails, that's not a big deal,
   920    # they can try again.
   921    if not app.ir:
   922        app.logger.debug("Features: configuration required first")
   923        return "Can't do features before configuration", 503
   924
   925    return jsonify(app.ir.features()), 200
   926
   927
   928@app.route("/_internal/v0/watt", methods=["POST"])
   929@internal_handler
   930def handle_watt_update():
   931    url = request.args.get("url", None)
   932
   933    if not url:
   934        app.logger.error("error: watt update requested with no URL")
   935        return "error: watt update requested with no URL\n", 400
   936
   937    app.logger.debug("Update requested: watt, %s" % url)
   938
   939    status, info = app.watcher.post("CONFIG", ("watt", url))
   940
   941    return info, status
   942
   943
   944@app.route("/_internal/v0/fs", methods=["POST"])
   945@internal_handler
   946def handle_fs():
   947    path = request.args.get("path", None)
   948
   949    if not path:
   950        app.logger.error("error: update requested with no PATH")
   951        return "error: update requested with no PATH\n", 400
   952
   953    app.logger.debug("Update requested from %s" % path)
   954
   955    status, info = app.watcher.post("CONFIG_FS", path)
   956
   957    return info, status
   958
   959
   960@app.route("/_internal/v0/events", methods=["GET"])
   961@internal_handler
   962def handle_events():
   963    if not app.local_scout:
   964        return "Local Scout is not enabled\n", 400
   965    assert isinstance(app.scout._scout, LocalScout)
   966
   967    event_dump = [
   968        (x["local_scout_timestamp"], x["mode"], x["action"], x) for x in app.scout._scout.events
   969    ]
   970
   971    app.logger.debug(f"Event dump {event_dump}")
   972
   973    return jsonify(event_dump)
   974
   975
   976@app.route("/ambassador/v0/favicon.ico", methods=["GET"])
   977def favicon():
   978    template_path = resource_filename(Requirement.parse("ambassador"), "templates")
   979
   980    return send_from_directory(template_path, "favicon.ico")
   981
   982
   983@app.route("/ambassador/v0/check_alive", methods=["GET"])
   984def check_alive():
   985    status = envoy_status(app.estatsmgr.get_stats())
   986
   987    if status["alive"]:
   988        return "ambassador liveness check OK (%s)\n" % status["uptime"], 200
   989    else:
   990        return "ambassador seems to have died (%s)\n" % status["uptime"], 503
   991
   992
   993@app.route("/ambassador/v0/check_ready", methods=["GET"])
   994def check_ready():
   995    if not app.ir:
   996        return "ambassador waiting for config\n", 503
   997
   998    status = envoy_status(app.estatsmgr.get_stats())
   999
  1000    if status["ready"]:
  1001        return "ambassador readiness check OK (%s)\n" % status["since_update"], 200
  1002    else:
  1003        return "ambassador not ready (%s)\n" % status["since_update"], 503
  1004
  1005
  1006@app.route("/ambassador/v0/diag/", methods=["GET"])
  1007@standard_handler
  1008def show_overview(reqid=None):
  1009    # If we don't have an IR yet, do nothing.
  1010    #
  1011    # We don't bother grabbing the config_lock here because we're not changing
  1012    # anything, and an overview request hitting at exactly the same moment as
  1013    # the first configure is a race anyway. If it fails, that's not a big deal,
  1014    # they can try again.
  1015    if not app.ir:
  1016        app.logger.debug("OV %s - can't do overview before configuration" % reqid)
  1017        return "Can't do overview before configuration", 503
  1018
  1019    if not _allow_diag_ui():
  1020        return Response("Not found\n", 404)
  1021
  1022    app.logger.debug("OV %s - showing overview" % reqid)
  1023
  1024    # Remember that app.diag is a property that can involve some real expense
  1025    # to compute -- we don't want to call it more than once here, so we cache
  1026    # its value.
  1027    diag = app.diag
  1028    assert diag
  1029
  1030    if app.verbose:
  1031        app.logger.debug("OV %s: DIAG" % reqid)
  1032        app.logger.debug("%s" % dump_json(diag.as_dict(), pretty=True))
  1033
  1034    estats = app.estatsmgr.get_stats()
  1035    ov = diag.overview(request, estats)
  1036
  1037    if app.verbose:
  1038        app.logger.debug("OV %s: OV" % reqid)
  1039        app.logger.debug("%s" % dump_json(ov, pretty=True))
  1040        app.logger.debug("OV %s: collecting errors" % reqid)
  1041
  1042    ddict = collect_errors_and_notices(request, reqid, "overview", diag)
  1043
  1044    banner_content = None
  1045    if app.banner_endpoint and app.ir and app.ir.edge_stack_allowed:
  1046        try:
  1047            response = requests.get(app.banner_endpoint)
  1048            if response.status_code == 200:
  1049                banner_content = response.text
  1050        except Exception as e:
  1051            app.logger.error("could not get banner_content: %s" % e)
  1052
  1053    tvars = dict(
  1054        system=system_info(app),
  1055        envoy_status=envoy_status(estats),
  1056        loginfo=app.estatsmgr.loginfo,
  1057        notices=app.notices.notices,
  1058        banner_content=banner_content,
  1059        **ov,
  1060        **ddict,
  1061    )
  1062
  1063    patch_client = request.args.get("patch_client", None)
  1064    if request.args.get("json", None):
  1065        filter_key = request.args.get("filter", None)
  1066
  1067        if filter_key == "webui":
  1068            filter_webui(tvars)
  1069        elif filter_key:
  1070            return jsonify(tvars.get(filter_key, None))
  1071
  1072        if patch_client:
  1073            # Assume this is the Admin UI. Recursively drop all "serialization"
  1074            # keys. This avoids leaking secrets and generally makes the
  1075            # snapshot a lot smaller without losing information that the Admin
  1076            # UI cares about. We do this below by setting the object_hook
  1077            # parameter of the json.loads(...) call. We have to use python's
  1078            # json library instead of orjson, because orjson does not support
  1079            # the object_hook feature.
  1080
  1081            # Get the previous full representation
  1082            cached_tvars_json = tvars_cache.get(patch_client, dict())
  1083            # Serialize the tvars into a json-string using the same jsonify Flask serializer, then load the json object
  1084            response_content = json.loads(flask_json.dumps(tvars), object_hook=drop_serializer_key)
  1085            # Diff between the previous representation and the current full representation  (http://jsonpatch.com/)
  1086            patch = jsonpatch.make_patch(cached_tvars_json, response_content)
  1087            # Save the current full representation in memory
  1088            tvars_cache[patch_client] = response_content
  1089
  1090            # Return only the diff
  1091            return Response(patch.to_string(), mimetype="application/json")
  1092        else:
  1093            return jsonify(tvars)
  1094    else:
  1095        app.check_scout("overview")
  1096        return Response(render_template("overview.html", **tvars))
  1097
  1098
  1099def collect_errors_and_notices(request, reqid, what: str, diag: Diagnostics) -> Dict:
  1100    loglevel = request.args.get("loglevel", None)
  1101    notice = None
  1102
  1103    if loglevel:
  1104        app.logger.debug("%s %s -- requesting loglevel %s" % (what, reqid, loglevel))
  1105        app.diag_log_level.labels("debug").set(1 if loglevel == "debug" else 0)
  1106
  1107        if not app.estatsmgr.update_log_levels(time.time(), level=loglevel):
  1108            notice = {"level": "WARNING", "message": "Could not update log level!"}
  1109        # else:
  1110        #     return redirect("/ambassador/v0/diag/", code=302)
  1111
  1112    # We need to grab errors and notices from diag.as_dict(), process the errors so
  1113    # they work for the HTML rendering, and post the notices to app.notices. Then we
  1114    # return the dict representation that our caller should work with.
  1115
  1116    ddict = diag.as_dict()
  1117
  1118    # app.logger.debug("ddict %s" % dump_json(ddict, pretty=True))
  1119
  1120    derrors = ddict.pop("errors", {})
  1121
  1122    errors = []
  1123
  1124    for err_key, err_list in derrors.items():
  1125        if err_key == "-global-":
  1126            err_key = ""
  1127
  1128        for err in err_list:
  1129            errors.append((err_key, err["error"]))
  1130
  1131    dnotices = ddict.pop("notices", {})
  1132
  1133    # Make sure that anything about the loglevel gets folded into this set.
  1134    if notice:
  1135        app.notices.prepend(notice)
  1136
  1137    for notice_key, notice_list in dnotices.items():
  1138        for notice in notice_list:
  1139            app.notices.post({"level": "NOTICE", "message": "%s: %s" % (notice_key, notice)})
  1140
  1141    ddict["errors"] = errors
  1142
  1143    return ddict
  1144
  1145
  1146@app.route("/ambassador/v0/diag/<path:source>", methods=["GET"])
  1147@standard_handler
  1148def show_intermediate(source=None, reqid=None):
  1149    # If we don't have an IR yet, do nothing.
  1150    #
  1151    # We don't bother grabbing the config_lock here because we're not changing
  1152    # anything, and an overview request hitting at exactly the same moment as
  1153    # the first configure is a race anyway. If it fails, that's not a big deal,
  1154    # they can try again.
  1155    if not app.ir:
  1156        app.logger.debug(
  1157            "SRC %s - can't do intermediate for %s before configuration" % (reqid, source)
  1158        )
  1159        return "Can't do overview before configuration", 503
  1160
  1161    if not _allow_diag_ui():
  1162        return Response("Not found\n", 404)
  1163
  1164    app.logger.debug("SRC %s - getting intermediate for '%s'" % (reqid, source))
  1165
  1166    # Remember that app.diag is a property that can involve some real expense
  1167    # to compute -- we don't want to call it more than once here, so we cache
  1168    # its value.
  1169    diag = app.diag
  1170    assert diag
  1171
  1172    method = request.args.get("method", None)
  1173    resource = request.args.get("resource", None)
  1174
  1175    estats = app.estatsmgr.get_stats()
  1176    result = diag.lookup(request, source, estats)
  1177    assert result
  1178
  1179    if app.verbose:
  1180        app.logger.debug("RESULT %s" % dump_json(result, pretty=True))
  1181
  1182    ddict = collect_errors_and_notices(request, reqid, "detail %s" % source, diag)
  1183
  1184    tvars = {
  1185        "system": system_info(app),
  1186        "envoy_status": envoy_status(estats),
  1187        "loginfo": app.estatsmgr.loginfo,
  1188        "notices": app.notices.notices,
  1189        "method": method,
  1190        "resource": resource,
  1191        **result,
  1192        **ddict,
  1193    }
  1194
  1195    if request.args.get("json", None):
  1196        key = request.args.get("filter", None)
  1197
  1198        if key:
  1199            return jsonify(tvars.get(key, None))
  1200        else:
  1201            return jsonify(tvars)
  1202    else:
  1203        app.check_scout("detail: %s" % source)
  1204        return Response(render_template("diag.html", **tvars))
  1205
  1206
  1207@app.template_filter("sort_by_key")
  1208def sort_by_key(objects):
  1209    return sorted(objects, key=lambda x: x["key"])
  1210
  1211
  1212@app.template_filter("pretty_json")
  1213def pretty_json(obj):
  1214    if isinstance(obj, dict):
  1215        obj = dict(**obj)
  1216
  1217        keys_to_drop = [key for key in obj.keys() if key.startswith("_")]
  1218
  1219        for key in keys_to_drop:
  1220            del obj[key]
  1221
  1222    return dump_json(obj, pretty=True)
  1223
  1224
  1225@app.template_filter("sort_clusters_by_service")
  1226def sort_clusters_by_service(clusters):
  1227    return sorted(clusters, key=lambda x: x["service"])
  1228    # return sorted([ c for c in clusters.values() ], key=lambda x: x['service'])
  1229
  1230
  1231@app.template_filter("source_lookup")
  1232def source_lookup(name, sources):
  1233    app.logger.debug("%s => sources %s" % (name, sources))
  1234
  1235    source = sources.get(name, {})
  1236
  1237    app.logger.debug("%s => source %s" % (name, source))
  1238
  1239    return source.get("_source", name)
  1240
  1241
  1242@app.route("/metrics", methods=["GET"])
  1243@standard_handler
  1244def get_prometheus_metrics(*args, **kwargs):
  1245    # Envoy metrics
  1246    envoy_metrics = app.estatsmgr.get_prometheus_stats()
  1247
  1248    # Ambassador OSS metrics
  1249    ambassador_metrics = generate_latest(registry=app.metrics_registry).decode("utf-8")
  1250
  1251    # Extra metrics endpoint
  1252    extra_metrics_content = ""
  1253    if app.metrics_endpoint and app.ir and app.ir.edge_stack_allowed:
  1254        try:
  1255            response = requests.get(app.metrics_endpoint)
  1256            if response.status_code == 200:
  1257                extra_metrics_content = response.text
  1258        except Exception as e:
  1259            app.logger.error("could not get metrics_endpoint: %s" % e)
  1260
  1261    return Response(
  1262        "".join([envoy_metrics, ambassador_metrics, extra_metrics_content]).encode("utf-8"),
  1263        200,
  1264        mimetype="text/plain",
  1265    )
  1266
  1267
  1268def bool_fmt(b: bool) -> str:
  1269    return "T" if b else "F"
  1270
  1271
  1272class StatusInfo:
  1273    def __init__(self) -> None:
  1274        self.status = True
  1275        self.specifics: List[Tuple[bool, str]] = []
  1276
  1277    def failure(self, message: str) -> None:
  1278        self.status = False
  1279        self.specifics.append((False, message))
  1280
  1281    def OK(self, message: str) -> None:
  1282        self.specifics.append((True, message))
  1283
  1284    def to_dict(self) -> Dict[str, Union[bool, List[Tuple[bool, str]]]]:
  1285        return {"status": self.status, "specifics": self.specifics}
  1286
  1287
  1288class SystemStatus:
  1289    def __init__(self) -> None:
  1290        self.status: Dict[str, StatusInfo] = {}
  1291
  1292    def failure(self, key: str, message: str) -> None:
  1293        self.info_for_key(key).failure(message)
  1294
  1295    def OK(self, key: str, message: str) -> None:
  1296        self.info_for_key(key).OK(message)
  1297
  1298    def info_for_key(self, key) -> StatusInfo:
  1299        if key not in self.status:
  1300            self.status[key] = StatusInfo()
  1301
  1302        return self.status[key]
  1303
  1304    def to_dict(self) -> Dict[str, Dict[str, Union[bool, List[Tuple[bool, str]]]]]:
  1305        return {key: info.to_dict() for key, info in self.status.items()}
  1306
  1307
  1308class KubeStatus:
  1309    pool: concurrent.futures.ProcessPoolExecutor
  1310
  1311    def __init__(self, app) -> None:
  1312        self.app = app
  1313        self.logger = app.logger
  1314        self.live: Dict[str, bool] = {}
  1315        self.current_status: Dict[str, str] = {}
  1316        self.pool = concurrent.futures.ProcessPoolExecutor(max_workers=5)
  1317
  1318    def mark_live(self, kind: str, name: str, namespace: str) -> None:
  1319        key = f"{kind}/{name}.{namespace}"
  1320
  1321        # self.logger.debug(f"KubeStatus MASTER {os.getpid()}: mark_live {key}")
  1322        self.live[key] = True
  1323
  1324    def prune(self) -> None:
  1325        drop: List[str] = []
  1326
  1327        for key in self.current_status.keys():
  1328            if not self.live.get(key, False):
  1329                drop.append(key)
  1330
  1331        for key in drop:
  1332            # self.logger.debug(f"KubeStatus MASTER {os.getpid()}: prune {key}")
  1333            del self.current_status[key]
  1334
  1335        self.live = {}
  1336
  1337    def post(self, kind: str, name: str, namespace: str, text: str) -> None:
  1338        key = f"{kind}/{name}.{namespace}"
  1339        extant = self.current_status.get(key, None)
  1340
  1341        if extant == text:
  1342            # self.logger.info(f"KubeStatus MASTER {os.getpid()}: {key} == {text}")
  1343            pass
  1344        else:
  1345            # self.logger.info(f"KubeStatus MASTER {os.getpid()}: {key} needs {text}")
  1346
  1347            # For now we're going to assume that this works.
  1348            self.current_status[key] = text
  1349            f = self.pool.submit(kubestatus_update, kind, name, namespace, text)
  1350            f.add_done_callback(kubestatus_update_done)
  1351
  1352
  1353# The KubeStatusNoMappings class clobbers the mark_live() method of the
  1354# KubeStatus class, so that updates to Mappings don't actually have any
  1355# effect, but updates to Ingress (for example) do.
  1356class KubeStatusNoMappings(KubeStatus):
  1357    def mark_live(self, kind: str, name: str, namespace: str) -> None:
  1358        pass
  1359
  1360    def post(self, kind: str, name: str, namespace: str, text: str) -> None:
  1361        # There's a path (via IRBaseMapping.check_status) where a Mapping
  1362        # can be added directly to ir.k8s_status_updates, which will come
  1363        # straight here without mark_live being involved -- so short-circuit
  1364        # here for Mappings, too.
  1365
  1366        if kind == "Mapping":
  1367            return
  1368
  1369        super().post(kind, name, namespace, text)
  1370
  1371
  1372def kubestatus_update(kind: str, name: str, namespace: str, text: str) -> str:
  1373    cmd = [
  1374        "kubestatus",
  1375        "--cache-dir",
  1376        "/tmp/client-go-http-cache",
  1377        kind,
  1378        name,
  1379        "-n",
  1380        namespace,
  1381        "-u",
  1382        "/dev/fd/0",
  1383    ]
  1384    # print(f"KubeStatus UPDATE {os.getpid()}: running command: {cmd}")
  1385
  1386    try:
  1387        rc = subprocess.run(
  1388            cmd,
  1389            input=text.encode("utf-8"),
  1390            stdout=subprocess.PIPE,
  1391            stderr=subprocess.STDOUT,
  1392            timeout=5,
  1393        )
  1394        if rc.returncode == 0:
  1395            return f"{name}.{namespace}: update OK"
  1396        else:
  1397            return f"{name}.{namespace}: error {rc.returncode}"
  1398
  1399    except subprocess.TimeoutExpired as e:
  1400        return f"{name}.{namespace}: timed out\n\n{e.output}"
  1401
  1402
  1403def kubestatus_update_done(f: concurrent.futures.Future) -> None:
  1404    # print(f"KubeStatus DONE {os.getpid()}: result {f.result()}")
  1405    pass
  1406
  1407
  1408class AmbassadorEventWatcher(threading.Thread):
  1409    # The key for 'Actions' is chimed - chimed_ok - env_good. This will make more sense
  1410    # if you read through the _load_ir method.
  1411
  1412    Actions = {
  1413        "F-F-F": ("unhealthy", True),  # make sure the first chime always gets out
  1414        "F-F-T": ("now-healthy", True),  # make sure the first chime always gets out
  1415        "F-T-F": ("now-unhealthy", True),  # this is actually impossible
  1416        "F-T-T": ("healthy", True),  # this is actually impossible
  1417        "T-F-F": ("unhealthy", False),
  1418        "T-F-T": ("now-healthy", True),
  1419        "T-T-F": ("now-unhealthy", True),
  1420        "T-T-T": ("update", False),
  1421    }
  1422
  1423    reCompressed = re.compile(r"-\d+$")
  1424
  1425    def __init__(self, app: DiagApp) -> None:
  1426        super().__init__(name="AEW", daemon=True)
  1427        self.app = app
  1428        self.logger = self.app.logger
  1429        self.events: queue.Queue = queue.Queue()
  1430
  1431        self.chimed = False  # Have we ever sent a chime about the environment?
  1432        self.last_chime = False  # What was the status of our last chime? (starts as False)
  1433        self.env_good = False  # Is our environment currently believed to be OK?
  1434        self.failure_list: List[str] = [
  1435            "unhealthy at boot"
  1436        ]  # What's making our environment not OK?
  1437
  1438    def post(
  1439        self, cmd: str, arg: Optional[Union[str, Tuple[str, Optional[IR]], Tuple[str, str]]]
  1440    ) -> Tuple[int, str]:
  1441        rqueue: queue.Queue = queue.Queue()
  1442
  1443        self.events.put((cmd, arg, rqueue))
  1444
  1445        return rqueue.get()
  1446
  1447    def update_estats(self) -> None:
  1448        self.app.estatsmgr.update()
  1449
  1450    def run(self):
  1451        self.logger.info("starting Scout checker and timer logger")
  1452        self.app.scout_checker = PeriodicTrigger(
  1453            lambda: self.check_scout("checkin"), period=86400
  1454        )  # Yup, one day.
  1455        self.app.timer_logger = PeriodicTrigger(self.app.post_timer_event, period=1)
  1456
  1457        self.logger.info("starting event watcher")
  1458
  1459        while True:
  1460            cmd, arg, rqueue = self.events.get()
  1461            # self.logger.info("EVENT: %s" % cmd)
  1462
  1463            if cmd == "CONFIG_FS":
  1464                try:
  1465                    self.load_config_fs(rqueue, arg)
  1466                except Exception as e:
  1467                    self.logger.error("could not reconfigure: %s" % e)
  1468                    self.logger.exception(e)
  1469                    self._respond(rqueue, 500, "configuration from filesystem failed")
  1470            elif cmd == "CONFIG":
  1471                version, url = arg
  1472
  1473                try:
  1474                    if version == "watt":
  1475                        self.load_config_watt(rqueue, url)
  1476                    else:
  1477                        raise RuntimeError("config from %s not supported" % version)
  1478                except Exception as e:
  1479                    self.logger.error("could not reconfigure: %s" % e)
  1480                    self.logger.exception(e)
  1481                    self._respond(rqueue, 500, "configuration failed")
  1482            elif cmd == "SCOUT":
  1483                try:
  1484                    self._respond(rqueue, 200, "checking Scout")
  1485                    self.check_scout(*arg)
  1486                except Exception as e:
  1487                    self.logger.error("could not reconfigure: %s" % e)
  1488                    self.logger.exception(e)
  1489                    self._respond(rqueue, 500, "scout check failed")
  1490            elif cmd == "TIMER":
  1491                try:
  1492                    self._respond(rqueue, 200, "done")
  1493                    self.app.check_timers()
  1494                except Exception as e:
  1495                    self.logger.error("could not check timers? %s" % e)
  1496                    self.logger.exception(e)
  1497            else:
  1498                self.logger.error(f"unknown event type: '{cmd}' '{arg}'")
  1499                self._respond(rqueue, 400, f"unknown event type '{cmd}' '{arg}'")
  1500
  1501    def _respond(self, rqueue: queue.Queue, status: int, info="") -> None:
  1502        # self.logger.debug("responding to query with %s %s" % (status, info))
  1503        rqueue.put((status, info))
  1504
  1505    # load_config_fs reconfigures from the filesystem. It's _mostly_ legacy
  1506    # code, but not entirely, since Docker demo mode still uses it.
  1507    #
  1508    # BE CAREFUL ABOUT STOPPING THE RECONFIGURATION TIMER ONCE IT IS STARTED.
  1509    def load_config_fs(self, rqueue: queue.Queue, path: str) -> None:
  1510        self.logger.debug("loading configuration from disk: %s" % path)
  1511
  1512        # The "path" here can just be a path, but it can also be a command for testing,
  1513        # if the user has chosen to allow that.
  1514
  1515        if self.app.allow_fs_commands and (":" in path):
  1516            pfx, rest = path.split(":", 1)
  1517
  1518            if pfx.lower() == "cmd":
  1519                fields = rest.split(":", 1)
  1520
  1521                cmd = fields[0].upper()
  1522
  1523                args = fields[1:] if (len(fields) > 1) else None
  1524
  1525                if cmd.upper() == "CHIME":
  1526                    self.logger.info("CMD: Chiming")
  1527
  1528                    self.chime()
  1529
  1530                    self._respond(rqueue, 200, "Chimed")
  1531                elif cmd.upper() == "CHIME_RESET":
  1532                    self.chimed = False
  1533                    self.last_chime = False
  1534                    self.env_good = False
  1535
  1536                    self.app.scout.reset_events()
  1537                    self.app.scout.report(mode="boot", action="boot1", no_cache=True)
  1538
  1539                    self.logger.info("CMD: Reset chime state")
  1540                    self._respond(rqueue, 200, "CMD: Reset chime state")
  1541                elif cmd.upper() == "SCOUT_CACHE_RESET":
  1542                    self.app.scout.reset_cache_time()
  1543
  1544                    self.logger.info("CMD: Reset Scout cache time")
  1545                    self._respond(rqueue, 200, "CMD: Reset Scout cache time")
  1546                elif cmd.upper() == "ENV_OK":
  1547                    self.env_good = True
  1548                    self.failure_list = []
  1549
  1550                    self.logger.info("CMD: Marked environment good")
  1551                    self._respond(rqueue, 200, "CMD: Marked environment good")
  1552                elif cmd.upper() == "ENV_BAD":
  1553                    self.env_good = False
  1554                    self.failure_list = ["failure forced"]
  1555
  1556                    self.logger.info("CMD: Marked environment bad")
  1557                    self._respond(rqueue, 200, "CMD: Marked environment bad")
  1558                else:
  1559                    self.logger.info(f'CMD: no such command "{cmd}"')
  1560                    self._respond(rqueue, 400, f'CMD: no such command "{cmd}"')
  1561
  1562                return
  1563            else:
  1564                self.logger.info(f'CONFIG_FS: invalid prefix "{pfx}"')
  1565                self._respond(rqueue, 400, f'CONFIG_FS: invalid prefix "{pfx}"')
  1566
  1567            return
  1568
  1569        # OK, we're starting a reconfiguration. BE CAREFUL TO STOP THE TIMER
  1570        # BEFORE YOU RESPOND TO THE CALLER.
  1571        self.app.config_timer.start()
  1572
  1573        snapshot = re.sub(r"[^A-Za-z0-9_-]", "_", path)
  1574        scc = FSSecretHandler(app.logger, path, app.snapshot_path, "0")
  1575
  1576        with self.app.fetcher_timer:
  1577            aconf = Config()
  1578            fetcher = ResourceFetcher(app.logger, aconf)
  1579            fetcher.load_from_filesystem(path, k8s=app.k8s, recurse=True)
  1580
  1581        if not fetcher.elements:
  1582            self.logger.debug("no configuration resources found at %s" % path)
  1583            # Don't bail from here -- go ahead and reload the IR.
  1584            #
  1585            # XXX This is basically historical logic, honestly. But if you try
  1586            # to respond from here and bail, STOP THE RECONFIGURATION TIMER.
  1587
  1588        self._load_ir(rqueue, aconf, fetcher, scc, snapshot)
  1589
  1590    # load_config_watt reconfigures from the filesystem. It's the one true way of
  1591    # reconfiguring these days.
  1592    #
  1593    # BE CAREFUL ABOUT STOPPING THE RECONFIGURATION TIMER ONCE IT IS STARTED.
  1594    def load_config_watt(self, rqueue: queue.Queue, url: str):
  1595        snapshot = url.split("/")[-1]
  1596        ss_path = os.path.join(app.snapshot_path, "snapshot-tmp.yaml")
  1597
  1598        # OK, we're starting a reconfiguration. BE CAREFUL TO STOP THE TIMER
  1599        # BEFORE YOU RESPOND TO THE CALLER.
  1600        self.app.config_timer.start()
  1601
  1602        self.logger.debug("copying configuration: watt, %s to %s" % (url, ss_path))
  1603
  1604        # Grab the serialization, and save it to disk too.
  1605        serialization = load_url_contents(self.logger, url, stream2=open(ss_path, "w"))
  1606
  1607        if not serialization:
  1608            self.logger.debug("no data loaded from snapshot %s" % snapshot)
  1609            # We never used to return here. I'm not sure if that's really correct?
  1610            #
  1611            # IF YOU CHANGE THIS, BE CAREFUL TO STOP THE RECONFIGURATION TIMER.
  1612
  1613        # Weirdly, we don't need a special WattSecretHandler: parse_watt knows how to handle
  1614        # the secrets that watt sends.
  1615        scc = SecretHandler(app.logger, url, app.snapshot_path, snapshot)
  1616
  1617        # OK. Time the various configuration sections separately.
  1618
  1619        with self.app.fetcher_timer:
  1620            aconf = Config()
  1621            fetcher = ResourceFetcher(app.logger, aconf)
  1622
  1623            if serialization:
  1624                fetcher.parse_watt(serialization)
  1625
  1626        if not fetcher.elements:
  1627            self.logger.debug("no configuration found in snapshot %s" % snapshot)
  1628
  1629            # Don't actually bail here. If they send over a valid config that happens
  1630            # to have nothing for us, it's still a legit config.
  1631            #
  1632            # IF YOU CHANGE THIS, BE CAREFUL TO STOP THE RECONFIGURATION TIMER.
  1633
  1634        self._load_ir(rqueue, aconf, fetcher, scc, snapshot)
  1635
  1636    # _load_ir is where the heavy lifting of a reconfigure happens.
  1637    #
  1638    # AT THE POINT OF ENTRY, THE RECONFIGURATION TIMER IS RUNNING. DO NOT LEAVE
  1639    # THIS METHOD WITHOUT STOPPING THE RECONFIGURATION TIMER.
  1640    def _load_ir(
  1641        self,
  1642        rqueue: queue.Queue,
  1643        aconf: Config,
  1644        fetcher: ResourceFetcher,
  1645        secret_handler: SecretHandler,
  1646        snapshot: str,
  1647    ) -> None:
  1648        with self.app.aconf_timer:
  1649            aconf.load_all(fetcher.sorted())
  1650
  1651            # TODO(Flynn): This is an awful hack. Have aconf.load(fetcher) that does
  1652            # this correctly.
  1653            #
  1654            # I'm not doing this at this moment because aconf.load_all() is called in a
  1655            # lot of places, and I don't want to destablize 2.2.2.
  1656            aconf.load_invalid(fetcher)
  1657
  1658        aconf_path = os.path.join(app.snapshot_path, "aconf-tmp.json")
  1659        open(aconf_path, "w").write(aconf.as_json())
  1660
  1661        # OK. What kind of reconfiguration are we doing?
  1662        config_type, reset_cache, invalidate_groups_for = IR.check_deltas(
  1663            self.logger, fetcher, self.app.cache
  1664        )
  1665
  1666        if reset_cache:
  1667            self.logger.debug("RESETTING CACHE")
  1668            self.app.cache = Cache(self.logger)
  1669
  1670        with self.app.ir_timer:
  1671            ir = IR(
  1672                aconf,
  1673                secret_handler=secret_handler,
  1674                invalidate_groups_for=invalidate_groups_for,
  1675                cache=self.app.cache,
  1676            )
  1677
  1678        ir_path = os.path.join(app.snapshot_path, "ir-tmp.json")
  1679        open(ir_path, "w").write(ir.as_json())
  1680
  1681        with self.app.econf_timer:
  1682            self.logger.debug("generating envoy configuration")
  1683            econf = EnvoyConfig.generate(ir, cache=self.app.cache)
  1684
  1685        # DON'T generate the Diagnostics here, because that turns out to be expensive.
  1686        # Instead, we'll just reset app.diag to None, then generate it on-demand when
  1687        # we need it.
  1688        #
  1689        # DO go ahead and split the Envoy config into its components for later, though.
  1690        bootstrap_config, ads_config, clustermap = econf.split_config()
  1691
  1692        # OK. Assume that the Envoy config is valid...
  1693        econf_is_valid = True
  1694        econf_bad_reason = ""
  1695
  1696        # ...then look for reasons it's not valid.
  1697        if not econf.has_listeners():
  1698            # No listeners == something in the Ambassador config is totally horked.
  1699            # Probably this is the user not defining any Hosts that match
  1700            # the Listeners in the system.
  1701            #
  1702            # As it happens, Envoy is OK running a config with no listeners, and it'll
  1703            # answer on port 8001 for readiness checks, so... log a notice, but run with it.
  1704            self.logger.warning(
  1705                "No active listeners at all; check your Listener and Host configuration"
  1706            )
  1707        elif not self.validate_envoy_config(
  1708            ir, config=ads_config, retries=self.app.validation_retries
  1709        ):
  1710            # Invalid Envoy config probably indicates a bug in Emissary itself. Sigh.
  1711            econf_is_valid = False
  1712            econf_bad_reason = "invalid envoy configuration generated"
  1713
  1714        # OK. Is the config invalid?
  1715        if not econf_is_valid:
  1716            # BZzzt. Don't post this update.
  1717            self.logger.info(
  1718                "no update performed (%s), continuing with current configuration..."
  1719                % econf_bad_reason
  1720            )
  1721
  1722            # Don't use app.check_scout; it will deadlock.
  1723            self.check_scout("attempted bad update")
  1724
  1725            # DO stop the reconfiguration timer before leaving.
  1726            self.app.config_timer.stop()
  1727            self._respond(
  1728                rqueue, 500, "ignoring (%s) in snapshot %s" % (econf_bad_reason, snapshot)
  1729            )
  1730            return
  1731
  1732        snapcount = int(os.environ.get("AMBASSADOR_SNAPSHOT_COUNT", "4"))
  1733        snaplist: List[Tuple[str, str]] = []
  1734
  1735        if snapcount > 0:
  1736            self.logger.debug("rotating snapshots for snapshot %s" % snapshot)
  1737
  1738            # If snapcount is 4, this range statement becomes range(-4, -1)
  1739            # which gives [ -4, -3, -2 ], which the list comprehension turns
  1740            # into [ ( "-3", "-4" ), ( "-2", "-3" ), ( "-1", "-2" ) ]...
  1741            # which is the list of suffixes to rename to rotate the snapshots.
  1742
  1743            snaplist += [(str(x + 1), str(x)) for x in range(-1 * snapcount, -1)]
  1744
  1745            # After dealing with that, we need to rotate the current file into -1.
  1746            snaplist.append(("", "-1"))
  1747
  1748        # Whether or not we do any rotation, we need to cycle in the '-tmp' file.
  1749        snaplist.append(("-tmp", ""))
  1750
  1751        for from_suffix, to_suffix in snaplist:
  1752            for fmt in ["aconf{}.json", "econf{}.json", "ir{}.json", "snapshot{}.yaml"]:
  1753                from_path = os.path.join(app.snapshot_path, fmt.format(from_suffix))
  1754                to_path = os.path.join(app.snapshot_path, fmt.format(to_suffix))
  1755
  1756                # Make sure we don't leave this method on error! The reconfiguration
  1757                # timer is still running, but also, the snapshots are a debugging aid:
  1758                # if we can't rotate them, meh, whatever.
  1759
  1760                try:
  1761                    self.logger.debug("rotate: %s -> %s" % (from_path, to_path))
  1762                    os.rename(from_path, to_path)
  1763                except IOError as e:
  1764                    self.logger.debug("skip %s -> %s: %s" % (from_path, to_path, e))
  1765                except Exception as e:
  1766                    self.logger.debug("could not rename %s -> %s: %s" % (from_path, to_path, e))
  1767
  1768        app.latest_snapshot = snapshot
  1769        self.logger.debug("saving Envoy configuration for snapshot %s" % snapshot)
  1770
  1771        with open(app.bootstrap_path, "w") as output:
  1772            output.write(dump_json(bootstrap_config, pretty=True))
  1773
  1774        with open(app.ads_path, "w") as output:
  1775            output.write(dump_json(ads_config, pretty=True))
  1776
  1777        with open(app.clustermap_path, "w") as output:
  1778            output.write(dump_json(clustermap, pretty=True))
  1779
  1780        with app.config_lock:
  1781            app.aconf = aconf
  1782            app.ir = ir
  1783            app.econf = econf
  1784
  1785            # Force app.diag to None so that it'll be regenerated on-demand.
  1786            app.diag = None
  1787
  1788        # We're finally done with the whole configuration process.
  1789        self.app.config_timer.stop()
  1790
  1791        if app.kick:
  1792            self.logger.debug("running '%s'" % app.kick)
  1793            os.system(app.kick)
  1794        elif app.ambex_pid != 0:
  1795            self.logger.debug("notifying PID %d ambex" % app.ambex_pid)
  1796            os.kill(app.ambex_pid, signal.SIGHUP)
  1797
  1798        # don't worry about TCPMappings yet
  1799        mappings = app.aconf.get_config("mappings")
  1800
  1801        if mappings:
  1802            for mapping_name, mapping in mappings.items():
  1803                app.kubestatus.mark_live(
  1804                    "Mapping", mapping_name, mapping.get("namespace", Config.ambassador_namespace)
  1805                )
  1806
  1807        app.kubestatus.prune()
  1808
  1809        if app.ir.k8s_status_updates:
  1810            update_count = 0
  1811
  1812            for name in app.ir.k8s_status_updates.keys():
  1813                update_count += 1
  1814                # Strip off any namespace in the name.
  1815                resource_name = name.split(".", 1)[0]
  1816                kind, namespace, update = app.ir.k8s_status_updates[name]
  1817                text = dump_json(update)
  1818
  1819                # self.logger.debug(f"K8s status update: {kind} {resource_name}.{namespace}, {text}...")
  1820
  1821                app.kubestatus.post(kind, resource_name, namespace, text)
  1822
  1823        group_count = len(app.ir.groups)
  1824        cluster_count = len(app.ir.clusters)
  1825        listener_count = len(app.ir.listeners)
  1826        service_count = len(app.ir.services)
  1827
  1828        self._respond(
  1829            rqueue, 200, "configuration updated (%s) from snapshot %s" % (config_type, snapshot)
  1830        )
  1831
  1832        self.logger.info(
  1833            "configuration updated (%s) from snapshot %s (S%d L%d G%d C%d)"
  1834            % (config_type, snapshot, service_count, listener_count, group_count, cluster_count)
  1835        )
  1836
  1837        # Remember that we've reconfigured.
  1838        self.app.reconf_stats.mark(config_type)
  1839
  1840        if app.health_checks and not app.stats_updater:
  1841            app.logger.debug("starting Envoy status updater")
  1842            app.stats_updater = PeriodicTrigger(app.watcher.update_estats, period=5)
  1843
  1844        # Check our environment...
  1845        self.check_environment()
  1846
  1847        self.chime()
  1848
  1849    def chime(self):
  1850        # In general, our reports here should be action "update", and they should honor the
  1851        # Scout cache, but we need to tweak that depending on whether we've done this before
  1852        # and on whether the environment looks OK.
  1853
  1854        already_chimed = bool_fmt(self.chimed)
  1855        was_ok = bool_fmt(self.last_chime)
  1856        now_ok = bool_fmt(self.env_good)
  1857
  1858        # Poor man's state machine...
  1859        action_key = f"{already_chimed}-{was_ok}-{now_ok}"
  1860        action, no_cache = AmbassadorEventWatcher.Actions[action_key]
  1861
  1862        self.logger.debug(f"CHIME: {action_key}")
  1863
  1864        class ChimeArgs(TypedDict):
  1865            no_cache: NotRequired[Optional[bool]]
  1866            ir: NotRequired[Optional[IR]]
  1867            failures: NotRequired[Optional[List[str]]]
  1868            action_key: NotRequired[Optional[str]]
  1869
  1870        chime_args: ChimeArgs = {"no_cache": no_cache, "failures": self.failure_list}
  1871
  1872        if self.app.report_action_keys:
  1873            chime_args["action_key"] = action_key
  1874
  1875        # Don't use app.check_scout; it will deadlock.
  1876        self.check_scout(action, **chime_args)
  1877
  1878        # Remember that we have now chimed...
  1879        self.chimed = True
  1880
  1881        # ...and remember what we sent for that chime.
  1882        self.last_chime = self.env_good
  1883
  1884    def check_environment(self, ir: Optional[IR] = None) -> None:
  1885        env_good = True
  1886        chime_failures = {}
  1887        env_status = SystemStatus()
  1888
  1889        error_count = 0
  1890        tls_count = 0
  1891        mapping_count = 0
  1892
  1893        if not ir:
  1894            ir = app.ir
  1895
  1896        if not ir:
  1897            chime_failures["no config loaded"] = True
  1898            env_good = False
  1899        else:
  1900            if not ir.aconf:
  1901                chime_failures["completely empty config"] = True
  1902                env_good = False
  1903            else:
  1904                for err_key, err_list in ir.aconf.errors.items():
  1905                    if err_key == "-global-":
  1906                        err_key = ""
  1907
  1908                    for err in err_list:
  1909                        error_count += 1
  1910                        err_text = err["error"]
  1911
  1912                        self.app.logger.info(f"error {err_key} {err_text}")
  1913
  1914                        if err_text.find("CRD") >= 0:
  1915                            if err_text.find("core") >= 0:
  1916                                chime_failures["core CRDs"] = True
  1917                                env_status.failure("CRDs", "Core CRD type definitions are missing")
  1918                            else:
  1919                                chime_failures["other CRDs"] = True
  1920                                env_status.failure(
  1921                                    "CRDs", "Resolver CRD type definitions are missing"
  1922                                )
  1923
  1924                            env_good = False
  1925                        elif err_text.find("TLS") >= 0:
  1926                            chime_failures["TLS errors"] = True
  1927                            env_status.failure("TLS", err_text)
  1928
  1929                            env_good = False
  1930
  1931            for context in ir.tls_contexts:
  1932                if context:
  1933                    tls_count += 1
  1934                    break
  1935
  1936            for group in ir.groups.values():
  1937                for mapping in group.mappings:
  1938                    pfx = mapping.get("prefix", None)
  1939                    name = mapping.get("name", None)
  1940
  1941                    if pfx:
  1942                        if not pfx.startswith("/ambassador/v0") or not name.startswith("internal_"):
  1943                            mapping_count += 1
  1944
  1945        if error_count:
  1946            env_status.failure(
  1947                "Error check",
  1948                f'{error_count} total error{"" if (error_count == 1) else "s"} logged',
  1949            )
  1950            env_good = False
  1951        else:
  1952            env_status.OK("Error check", "No errors logged")
  1953
  1954        if tls_count:
  1955            env_status.OK(
  1956                "TLS", f'{tls_count} TLSContext{" is" if (tls_count == 1) else "s are"} active'
  1957            )
  1958        else:
  1959            chime_failures["no TLS contexts"] = True
  1960            env_status.failure("TLS", "No TLSContexts are active")
  1961
  1962            env_good = False
  1963
  1964        if mapping_count:
  1965            env_status.OK(
  1966                "Mappings",
  1967                f'{mapping_count} Mapping{" is" if (mapping_count == 1) else "s are"} active',
  1968            )
  1969        else:
  1970            chime_failures["no Mappings"] = True
  1971            env_status.failure("Mappings", "No Mappings are active")
  1972            env_good = False
  1973
  1974        failure_list: List[str] = []
  1975
  1976        if not env_good:
  1977            failure_list = list(sorted(chime_failures.keys()))
  1978
  1979        self.env_good = env_good
  1980        self.env_status = env_status
  1981        self.failure_list = failure_list
  1982
  1983    def check_scout(
  1984        self,
  1985        what: str,
  1986        no_cache: Optional[bool] = False,
  1987        ir: Optional[IR] = None,
  1988        failures: Optional[List[str]] = None,
  1989        action_key: Optional[str] = None,
  1990    ) -> None:
  1991        now = datetime.datetime.now()
  1992        uptime = now - boot_time
  1993        hr_uptime = td_format(uptime)
  1994
  1995        if not ir:
  1996            ir = app.ir
  1997
  1998        self.app.notices.reset()
  1999
  2000        scout_args = {"uptime": int(uptime.total_seconds()), "hr_uptime": hr_uptime}
  2001
  2002        if failures:
  2003            scout_args["failures"] = failures
  2004
  2005        if action_key:
  2006            scout_args["action_key"] = action_key
  2007
  2008        if ir:
  2009            self.app.logger.debug("check_scout: we have an IR")
  2010
  2011            if not os.environ.get("AMBASSADOR_DISABLE_FEATURES", None):
  2012                self.app.logger.debug("check_scout: including features")
  2013                feat = ir.features()
  2014
  2015                # Include features about the cache and incremental reconfiguration,
  2016                # too.
  2017
  2018                if self.app.cache is not None:
  2019                    # Fast reconfigure is on. Supply the real info.
  2020                    feat["frc_enabled"] = True
  2021                    feat["frc_cache_hits"] = self.app.cache.hits
  2022                    feat["frc_cache_misses"] = self.app.cache.misses
  2023                    feat["frc_inv_calls"] = self.app.cache.invalidate_calls
  2024                    feat["frc_inv_objects"] = self.app.cache.invalidated_objects
  2025                else:
  2026                    # Fast reconfigure is off.
  2027                    feat["frc_enabled"] = False
  2028
  2029                # Whether the cache is on or off, we can talk about reconfigurations.
  2030                feat["frc_incr_count"] = self.app.reconf_stats.counts["incremental"]
  2031                feat["frc_complete_count"] = self.app.reconf_stats.counts["complete"]
  2032                feat["frc_check_count"] = self.app.reconf_stats.checks
  2033                feat["frc_check_errors"] = self.app.reconf_stats.errors
  2034
  2035                request_data = app.estatsmgr.get_stats().requests
  2036
  2037                if request_data:
  2038                    self.app.logger.debug("check_scout: including requests")
  2039
  2040                    for rkey in request_data.keys():
  2041                        cur = request_data[rkey]
  2042                        prev = app.last_request_info.get(rkey, 0)
  2043                        feat[f"request_{rkey}_count"] = max(cur - prev, 0)
  2044
  2045                    lrt = app.last_request_time or boot_time
  2046                    since_lrt = now - lrt
  2047                    elapsed = since_lrt.total_seconds()
  2048                    hr_elapsed = td_format(since_lrt)
  2049
  2050                    app.last_request_time = now
  2051                    app.last_request_info = request_data
  2052
  2053                    feat["request_elapsed"] = elapsed
  2054                    feat["request_hr_elapsed"] = hr_elapsed
  2055
  2056                scout_args["features"] = feat
  2057
  2058        scout_result = self.app.scout.report(
  2059            mode="diagd", action=what, no_cache=no_cache, **scout_args
  2060        )
  2061        scout_notices = scout_result.pop("notices", [])
  2062
  2063        global_loglevel = self.app.logger.getEffectiveLevel()
  2064
  2065        self.app.logger.debug(f"Scout section: global loglevel {global_loglevel}")
  2066
  2067        for notice in scout_notices:
  2068            notice_level_name = notice.get("level") or "INFO"
  2069            notice_level = logging.getLevelName(notice_level_name)
  2070
  2071            if notice_level >= global_loglevel:
  2072                self.app.logger.debug(f"Scout section: include {notice}")
  2073                self.app.notices.post(notice)
  2074            else:
  2075                self.app.logger.debug(f"Scout section: skip {notice}")
  2076
  2077        self.app.logger.debug("Scout reports %s" % dump_json(scout_result))
  2078        self.app.logger.debug("Scout notices: %s" % dump_json(scout_notices))
  2079        self.app.logger.debug("App notices after scout: %s" % dump_json(app.notices.notices))
  2080
  2081    def validate_envoy_config(self, ir: IR, config, retries) -> bool:
  2082        if self.app.no_envoy:
  2083            self.app.logger.debug("Skipping validation")
  2084            return True
  2085
  2086        # We want to keep the original config untouched
  2087        validation_config = copy.deepcopy(config)
  2088
  2089        # Envoy fails to validate with @type field in envoy config, so removing that
  2090        validation_config.pop("@type")
  2091
  2092        if os.environ.get("AMBASSADOR_DEBUG_CLUSTER_CONFIG", "false").lower() == "true":
  2093            vconf_clusters = validation_config["static_resources"]["clusters"]
  2094
  2095            if len(vconf_clusters) > 10:
  2096                vconf_clusters.append(copy.deepcopy(vconf_clusters[10]))
  2097
  2098            # Check for cluster-name weirdness.
  2099            _v2_clusters = {}
  2100            _problems = []
  2101
  2102            for name in sorted(ir.clusters.keys()):
  2103                if AmbassadorEventWatcher.reCompressed.search(name):
  2104                    _problems.append(f"IR pre-compressed cluster {name}")
  2105
  2106            for cluster in validation_config["static_resources"]["clusters"]:
  2107                name = cluster["name"]
  2108
  2109                if name in _v2_clusters:
  2110                    _problems.append(f"V2 dup cluster {name}")
  2111                _v2_clusters[name] = True
  2112
  2113            if _problems:
  2114                self.logger.error("ENVOY CONFIG PROBLEMS:\n%s", "\n".join(_problems))
  2115                stamp = datetime.datetime.now().isoformat()
  2116
  2117                bad_snapshot = open(
  2118                    os.path.join(app.snapshot_path, "snapshot-tmp.yaml"), "r"
  2119                ).read()
  2120
  2121                cache_dict: Dict[str, Any] = {}
  2122                cache_links: Dict[str, Any] = {}
  2123
  2124                if self.app.cache:
  2125                    for k, c in self.app.cache.cache.items():
  2126                        v: Any = c[0]
  2127
  2128                        if getattr(v, "as_dict", None):
  2129                            v = v.as_dict()
  2130
  2131                        cache_dict[k] = v
  2132
  2133                    cache_links = {k: list(v) for k, v in self.app.cache.links.items()}
  2134
  2135                bad_dict = {
  2136                    "ir": ir.as_dict(),
  2137                    "v2": config,
  2138                    "validation": validation_config,
  2139                    "problems": _problems,
  2140                    "snapshot": bad_snapshot,
  2141                    "cache": cache_dict,
  2142                    "links": cache_links,
  2143                }
  2144
  2145                bad_dict_str = dump_json(bad_dict, pretty=True)
  2146                with open(os.path.join(app.snapshot_path, f"problems-{stamp}.json"), "w") as output:
  2147                    output.write(bad_dict_str)
  2148
  2149        config_json = dump_json(validation_config, pretty=True)
  2150
  2151        econf_validation_path = os.path.join(app.snapshot_path, "econf-tmp.json")
  2152
  2153        with open(econf_validation_path, "w") as output:
  2154            output.write(config_json)
  2155
  2156        command = [
  2157            "envoy",
  2158            "--service-node",
  2159            "test-id",
  2160            "--service-cluster",
  2161            ir.ambassador_nodename,
  2162            "--config-path",
  2163            econf_validation_path,
  2164            "--mode",
  2165            "validate",
  2166        ]
  2167
  2168        v_exit = 0
  2169        v_encoded = "".encode("utf-8")
  2170
  2171        # Try to validate the Envoy config. Short circuit and fall through
  2172        # immediately on concrete success or failure, and retry (up to the
  2173        # limit) on timeout.
  2174        #
  2175        # The default timeout is 5s, but this can be overridden in the Ambassador
  2176        # module.
  2177
  2178        amod = ir.ambassador_module
  2179        timeout = amod.envoy_validation_timeout if amod else IRAmbassador.default_validation_timeout
  2180
  2181        # If the timeout is zero, don't do the validation.
  2182        if timeout == 0:
  2183            self.logger.debug("not validating Envoy configuration since timeout is 0")
  2184            return True
  2185
  2186        self.logger.debug(f"validating Envoy configuration with timeout {timeout}")
  2187
  2188        for retry in range(retries):
  2189            try:
  2190                v_encoded = subprocess.check_output(
  2191                    command, stderr=subprocess.STDOUT, timeout=timeout
  2192                )
  2193                v_exit = 0
  2194                break
  2195            except subprocess.CalledProcessError as e:
  2196                v_exit = e.returncode
  2197                v_encoded = e.output
  2198                break
  2199            except subprocess.TimeoutExpired as e:
  2200                v_exit = 1
  2201                v_encoded = e.output or "".encode("utf-8")
  2202
  2203                self.logger.warn(
  2204                    "envoy configuration validation timed out after {} seconds{}\n{}".format(
  2205                        timeout,
  2206                        ", retrying..." if retry < retries - 1 else "",
  2207                        v_encoded.decode("utf-8"),
  2208                    )
  2209                )
  2210
  2211                # Don't break here; continue on to the next iteration of the loop.
  2212
  2213        if v_exit == 0:
  2214            self.logger.debug(
  2215                "successfully validated the resulting envoy configuration, continuing..."
  2216            )
  2217            return True
  2218
  2219        v_str = typecast(str, v_encoded)
  2220
  2221        try:
  2222            v_str = v_encoded.decode("utf-8")
  2223        except:
  2224            pass
  2225
  2226        self.logger.error(
  2227            "{}\ncould not validate the envoy configuration above after {} retries, failed with error \n{}\n(exit code {})\nAborting update...".format(
  2228                config_json, retries, v_str, v_exit
  2229            )
  2230        )
  2231        return False
  2232
  2233
  2234class StandaloneApplication(gunicorn.app.base.BaseApplication):
  2235    def __init__(self, app, options=None):
  2236        self.options = options or {}
  2237        self.application = app
  2238        super(StandaloneApplication, self).__init__()
  2239
  2240        # Boot chime. This is basically the earliest point at which we can consider an Ambassador
  2241        # to be "running".
  2242        scout_result = self.application.scout.report(mode="boot", action="boot1", no_cache=True)
  2243        self.application.logger.debug(f"BOOT: Scout result {dump_json(scout_result)}")
  2244        self.application.logger.info(f"Ambassador {__version__} booted")
  2245
  2246    def load_config(self):
  2247        config = dict(
  2248            [
  2249                (key, value)
  2250                for key, value in self.options.items()
  2251                if key in self.cfg.settings and value is not None
  2252            ]
  2253        )
  2254
  2255        for key, value in config.items():
  2256            self.cfg.set(key.lower(), value)
  2257
  2258    def load(self):
  2259        # This is a little weird, but whatever.
  2260        self.application.watcher = AmbassadorEventWatcher(self.application)
  2261        self.application.watcher.start()
  2262
  2263        if self.application.config_path:
  2264            self.application.watcher.post("CONFIG_FS", self.application.config_path)
  2265
  2266        return self.application
  2267
  2268
  2269@click.command()
  2270@click.argument("snapshot-path", type=click.Path(), required=False)
  2271@click.argument("bootstrap-path", type=click.Path(), required=False)
  2272@click.argument("ads-path", type=click.Path(), required=False)
  2273@click.option(
  2274    "--config-path",
  2275    type=click.Path(),
  2276    help="Optional configuration path to scan for Ambassador YAML files",
  2277)
  2278@click.option(
  2279    "--k8s",
  2280    is_flag=True,
  2281    help="If True, assume config_path contains Kubernetes resources (only relevant with config_path)",
  2282)
  2283@click.option(
  2284    "--ambex-pid",
  2285    type=int,
  2286    default=0,
  2287    help="Optional PID to signal with HUP after updating Envoy configuration",
  2288    show_default=True,
  2289)
  2290@click.option("--kick", type=str, help="Optional command to run after updating Envoy configuration")
  2291@click.option(
  2292    "--banner-endpoint",
  2293    type=str,
  2294    default="http://127.0.0.1:8500/banner",
  2295    help="Optional endpoint of extra banner to include",
  2296    show_default=True,
  2297)
  2298@click.option(
  2299    "--metrics-endpoint",
  2300    type=str,
  2301    default="http://127.0.0.1:8500/metrics",
  2302    help="Optional endpoint of extra prometheus metrics to include",
  2303    show_default=True,
  2304)
  2305@click.option("--no-checks", is_flag=True, help="If True, don't do Envoy-cluster health checking")
  2306@click.option("--no-envoy", is_flag=True, help="If True, don't interact with Envoy at all")
  2307@click.option("--reload", is_flag=True, help="If True, run Flask in debug mode for live reloading")
  2308@click.option("--debug", is_flag=True, help="If True, do debug logging")
  2309@click.option(
  2310    "--dev-magic",
  2311    is_flag=True,
  2312    help="If True, override a bunch of things for Datawire dev-loop stuff",
  2313)
  2314@click.option("--verbose", is_flag=True, help="If True, do really verbose debug logging")
  2315@click.option(
  2316    "--workers", type=int, help="Number of workers; default is based on the number of CPUs present"
  2317)
  2318@click.option("--host", type=str, help="Interface on which to listen")
  2319@click.option("--port", type=int, default=-1, help="Port on which to listen", show_default=True)
  2320@click.option("--notices", type=click.Path(), help="Optional file to read for local notices")
  2321@click.option(
  2322    "--validation-retries",
  2323    type=int,
  2324    default=5,
  2325    help="Number of times to retry Envoy configuration validation after a timeout",
  2326    show_default=True,
  2327)
  2328@click.option(
  2329    "--allow-fs-commands",
  2330    is_flag=True,
  2331    help="If true, allow CONFIG_FS to support debug/testing commands",
  2332)
  2333@click.option(
  2334    "--local-scout",
  2335    is_flag=True,
  2336    help="Don't talk to remote Scout at all; keep everything purely local",
  2337)
  2338@click.option("--report-action-keys", is_flag=True, help="Report action keys when chiming")
  2339def main(
  2340    snapshot_path=None,
  2341    bootstrap_path=None,
  2342    ads_path=None,
  2343    *,
  2344    dev_magic=False,
  2345    config_path=None,
  2346    ambex_pid=0,
  2347    kick=None,
  2348    banner_endpoint="http://127.0.0.1:8500/banner",
  2349    metrics_endpoint="http://127.0.0.1:8500/metrics",
  2350    k8s=False,
  2351    no_checks=False,
  2352    no_envoy=False,
  2353    reload=False,
  2354    debug=False,
  2355    verbose=False,
  2356    workers=None,
  2357    port=-1,
  2358    host="",
  2359    notices=None,
  2360    validation_retries=5,
  2361    allow_fs_commands=False,
  2362    local_scout=False,
  2363    report_action_keys=False,
  2364):
  2365    """
  2366    Run the diagnostic daemon.
  2367
  2368    Arguments:
  2369
  2370      SNAPSHOT_PATH
  2371        Path to directory in which to save configuration snapshots and dynamic secrets
  2372
  2373      BOOTSTRAP_PATH
  2374        Path to which to write bootstrap Envoy configuration
  2375
  2376      ADS_PATH
  2377        Path to which to write ADS Envoy configuration
  2378    """
  2379
  2380    enable_fast_reconfigure = parse_bool(os.environ.get("AMBASSADOR_FAST_RECONFIGURE", "true"))
  2381
  2382    if port < 0:
  2383        port = Constants.DIAG_PORT if not enable_fast_reconfigure else Constants.DIAG_PORT_ALT
  2384        # port = Constants.DIAG_PORT
  2385
  2386    if not host:
  2387        host = "0.0.0.0" if not enable_fast_reconfigure else "127.0.0.1"
  2388
  2389    if dev_magic:
  2390        # Override the world.
  2391        os.environ["SCOUT_HOST"] = "127.0.0.1:9999"
  2392        os.environ["SCOUT_HTTPS"] = "no"
  2393
  2394        no_checks = True
  2395        no_envoy = True
  2396
  2397        os.makedirs("/tmp/snapshots", mode=0o755, exist_ok=True)
  2398
  2399        snapshot_path = "/tmp/snapshots"
  2400        bootstrap_path = "/tmp/boot.json"
  2401        ads_path = "/tmp/ads.json"
  2402
  2403        port = 9998
  2404
  2405        allow_fs_commands = True
  2406        local_scout = True
  2407        report_action_keys = True
  2408
  2409    if no_envoy:
  2410        no_checks = True
  2411
  2412    # Create the application itself.
  2413    app.setup(
  2414        snapshot_path,
  2415        bootstrap_path,
  2416        ads_path,
  2417        config_path,
  2418        ambex_pid,
  2419        kick,
  2420        banner_endpoint,
  2421        metrics_endpoint,
  2422        k8s,
  2423        not no_checks,
  2424        no_envoy,
  2425        reload,
  2426        debug,
  2427        verbose,
  2428        notices,
  2429        validation_retries,
  2430        allow_fs_commands,
  2431        local_scout,
  2432        report_action_keys,
  2433        enable_fast_reconfigure,
  2434    )
  2435
  2436    if not workers:
  2437        workers = number_of_workers()
  2438
  2439    gunicorn_config = {
  2440        "bind": "%s:%s" % (host, port),
  2441        # 'workers': 1,
  2442        "threads": workers,
  2443    }
  2444
  2445    app.logger.info(
  2446        "thread count %d, listening on %s" % (gunicorn_config["threads"], gunicorn_config["bind"])
  2447    )
  2448
  2449    StandaloneApplication(app, gunicorn_config).run()
  2450
  2451
  2452if __name__ == "__main__":
  2453    main()

View as plain text