...

Text file src/github.com/datawire/ambassador/v2/python/ambassador_diag/diagd.py

Documentation: github.com/datawire/ambassador/v2/python/ambassador_diag

     1#!python
     2
     3# Copyright 2018 Datawire. All rights reserved.
     4#
     5# Licensed under the Apache License, Version 2.0 (the "License");
     6# you may not use this file except in compliance with the License.
     7# You may obtain a copy of the License at
     8#
     9#     http://www.apache.org/licenses/LICENSE-2.0
    10#
    11# Unless required by applicable law or agreed to in writing, software
    12# distributed under the License is distributed on an "AS IS" BASIS,
    13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14# See the License for the specific language governing permissions and
    15# limitations under the License
    16import concurrent.futures
    17import copy
    18import datetime
    19import difflib
    20import functools
    21import json
    22import logging
    23import multiprocessing
    24import os
    25import queue
    26import re
    27import signal
    28import subprocess
    29import sys
    30import threading
    31import time
    32import traceback
    33import uuid
    34from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union
    35from typing import cast as typecast
    36
    37import click
    38import gunicorn.app.base
    39import jsonpatch
    40import requests
    41from expiringdict import ExpiringDict
    42from flask import Flask, Response
    43from flask import json as flask_json
    44from flask import jsonify, render_template, request, send_from_directory
    45from pkg_resources import Requirement, resource_filename
    46from prometheus_client import CollectorRegistry, Gauge, Info, ProcessCollector, generate_latest
    47from pythonjsonlogger import jsonlogger
    48
    49from ambassador import IR, Cache, Config, Diagnostics, EnvoyConfig, Scout, Version
    50from ambassador.constants import Constants
    51from ambassador.diagnostics import EnvoyStats, EnvoyStatsMgr
    52from ambassador.fetch import ResourceFetcher
    53from ambassador.ir.irambassador import IRAmbassador
    54from ambassador.reconfig_stats import ReconfigStats
    55from ambassador.utils import (
    56    FSSecretHandler,
    57    PeriodicTrigger,
    58    SecretHandler,
    59    SystemInfo,
    60    Timer,
    61    dump_json,
    62    load_url_contents,
    63    parse_bool,
    64    parse_json,
    65)
    66
    67if TYPE_CHECKING:
    68    from ambassador.ir.irtlscontext import IRTLSContext  # pragma: no cover
    69
    70__version__ = Version
    71
    72boot_time = datetime.datetime.now()
    73
    74# allows 10 concurrent users, with a request timeout of 60 seconds
    75tvars_cache = ExpiringDict(max_len=10, max_age_seconds=60)
    76
    77logHandler = None
    78if parse_bool(os.environ.get("AMBASSADOR_JSON_LOGGING", "false")):
    79    jsonFormatter = jsonlogger.JsonFormatter(
    80        "%%(asctime)s %%(filename)s %%(lineno)d %%(process)d (threadName)s %%(levelname)s %%(message)s"
    81    )
    82    logHandler = logging.StreamHandler()
    83    logHandler.setFormatter(jsonFormatter)
    84
    85    # Set the root logger to INFO level and tell it to use the new log handler.
    86    logger = logging.getLogger()
    87    logger.setLevel(logging.INFO)
    88    logger.addHandler(logHandler)
    89
    90    # Update all of the other loggers to also use the new log handler.
    91    loggingManager = getattr(logging.root, "manager", None)
    92    if loggingManager is not None:
    93        for name in loggingManager.loggerDict:
    94            logging.getLogger(name).addHandler(logHandler)
    95    else:
    96        print("Could not find a logging manager. Some logging may not be properly JSON formatted!")
    97else:
    98    # Default log level
    99    level = logging.INFO
   100
   101    # Check for env var log level
   102    if level_name := os.getenv("AES_LOG_LEVEL"):
   103        level_number = logging.getLevelName(level_name.upper())
   104
   105        if isinstance(level_number, int):
   106            level = level_number
   107
   108    # Set defauts for all loggers
   109    logging.basicConfig(
   110        level=level,
   111        format="%%(asctime)s diagd %s [P%%(process)dT%%(threadName)s] %%(levelname)s: %%(message)s"
   112        % __version__,
   113        datefmt="%Y-%m-%d %H:%M:%S",
   114    )
   115
   116# Shut up Werkzeug's standard request logs -- they're just too noisy.
   117logging.getLogger("werkzeug").setLevel(logging.CRITICAL)
   118
   119# Likewise make requests a bit quieter.
   120logging.getLogger("urllib3").setLevel(logging.WARNING)
   121logging.getLogger("requests").setLevel(logging.WARNING)
   122
   123ambassador_targets = {
   124    "mapping": "https://www.getambassador.io/reference/configuration#mappings",
   125    "module": "https://www.getambassador.io/reference/configuration#modules",
   126}
   127
   128# envoy_targets = {
   129#     'route': 'https://envoyproxy.github.io/envoy/configuration/http_conn_man/route_config/route.html',
   130#     'cluster': 'https://envoyproxy.github.io/envoy/configuration/cluster_manager/cluster.html',
   131# }
   132
   133
   134def number_of_workers():
   135    return (multiprocessing.cpu_count() * 2) + 1
   136
   137
   138class DiagApp(Flask):
   139    cache: Optional[Cache]
   140    ambex_pid: int
   141    kick: Optional[str]
   142    estatsmgr: EnvoyStatsMgr
   143    config_path: Optional[str]
   144    snapshot_path: str
   145    bootstrap_path: str
   146    ads_path: str
   147    clustermap_path: str
   148    health_checks: bool
   149    no_envoy: bool
   150    debugging: bool
   151    allow_fs_commands: bool
   152    report_action_keys: bool
   153    verbose: bool
   154    notice_path: str
   155    logger: logging.Logger
   156    aconf: Config
   157    ir: Optional[IR]
   158    econf: Optional[EnvoyConfig]
   159    # self.diag is actually a property
   160    _diag: Optional[Diagnostics]
   161    notices: "Notices"
   162    scout: Scout
   163    watcher: "AmbassadorEventWatcher"
   164    stats_updater: Optional[PeriodicTrigger]
   165    scout_checker: Optional[PeriodicTrigger]
   166    last_request_info: Dict[str, int]
   167    last_request_time: Optional[datetime.datetime]
   168    latest_snapshot: str
   169    banner_endpoint: Optional[str]
   170    metrics_endpoint: Optional[str]
   171
   172    # Reconfiguration stats
   173    reconf_stats: ReconfigStats
   174
   175    # Custom metrics registry to weed-out default metrics collectors because the
   176    # default collectors can't be prefixed/namespaced with ambassador_.
   177    # Using the default metrics collectors would lead to name clashes between the Python and Go instrumentations.
   178    metrics_registry: CollectorRegistry
   179
   180    config_lock: threading.Lock
   181    diag_lock: threading.Lock
   182
   183    def setup(
   184        self,
   185        snapshot_path: str,
   186        bootstrap_path: str,
   187        ads_path: str,
   188        config_path: Optional[str],
   189        ambex_pid: int,
   190        kick: Optional[str],
   191        banner_endpoint: Optional[str],
   192        metrics_endpoint: Optional[str],
   193        k8s=False,
   194        do_checks=True,
   195        no_envoy=False,
   196        reload=False,
   197        debug=False,
   198        verbose=False,
   199        notices=None,
   200        validation_retries=5,
   201        allow_fs_commands=False,
   202        local_scout=False,
   203        report_action_keys=False,
   204        enable_fast_reconfigure=False,
   205        clustermap_path=None,
   206    ):
   207        self.health_checks = do_checks
   208        self.no_envoy = no_envoy
   209        self.debugging = reload
   210        self.verbose = verbose
   211        self.notice_path = notices
   212        self.notices = Notices(self.notice_path)
   213        self.notices.reset()
   214        self.k8s = k8s
   215        self.validation_retries = validation_retries
   216        self.allow_fs_commands = allow_fs_commands
   217        self.local_scout = local_scout
   218        self.report_action_keys = report_action_keys
   219        self.banner_endpoint = banner_endpoint
   220        self.metrics_endpoint = metrics_endpoint
   221        self.metrics_registry = CollectorRegistry(auto_describe=True)
   222        self.enable_fast_reconfigure = enable_fast_reconfigure
   223
   224        # Init logger, inherits settings from default
   225        self.logger = logging.getLogger("ambassador.diagd")
   226
   227        # Initialize the Envoy stats manager...
   228        self.estatsmgr = EnvoyStatsMgr(self.logger)
   229
   230        # ...and the incremental-reconfigure stats.
   231        self.reconf_stats = ReconfigStats(self.logger)
   232
   233        # This will raise an exception and crash if you pass it a string. That's intentional.
   234        self.ambex_pid = int(ambex_pid)
   235        self.kick = kick
   236
   237        # Initialize the cache if we're allowed to.
   238        if self.enable_fast_reconfigure:
   239            self.logger.info("AMBASSADOR_FAST_RECONFIGURE enabled, initializing cache")
   240            self.cache = Cache(self.logger)
   241        else:
   242            self.logger.info("AMBASSADOR_FAST_RECONFIGURE disabled, not initializing cache")
   243            self.cache = None
   244
   245        # Use Timers to keep some stats on reconfigurations
   246        self.config_timer = Timer("reconfiguration", self.metrics_registry)
   247        self.fetcher_timer = Timer("Fetcher", self.metrics_registry)
   248        self.aconf_timer = Timer("AConf", self.metrics_registry)
   249        self.ir_timer = Timer("IR", self.metrics_registry)
   250        self.econf_timer = Timer("EConf", self.metrics_registry)
   251        self.diag_timer = Timer("Diagnostics", self.metrics_registry)
   252
   253        # Use gauges to keep some metrics on active config
   254        self.diag_errors = Gauge(
   255            f"diagnostics_errors",
   256            f"Number of configuration errors",
   257            namespace="ambassador",
   258            registry=self.metrics_registry,
   259        )
   260        self.diag_notices = Gauge(
   261            f"diagnostics_notices",
   262            f"Number of configuration notices",
   263            namespace="ambassador",
   264            registry=self.metrics_registry,
   265        )
   266        self.diag_log_level = Gauge(
   267            f"log_level",
   268            f"Debug log level enabled or not",
   269            ["level"],
   270            namespace="ambassador",
   271            registry=self.metrics_registry,
   272        )
   273
   274        if debug:
   275            self.logger.setLevel(logging.DEBUG)
   276            self.diag_log_level.labels("debug").set(1)
   277            logging.getLogger("ambassador").setLevel(logging.DEBUG)
   278        else:
   279            self.diag_log_level.labels("debug").set(0)
   280
   281        # Assume that we will NOT update Mapping status.
   282        ksclass: Type[KubeStatus] = KubeStatusNoMappings
   283
   284        if os.environ.get("AMBASSADOR_UPDATE_MAPPING_STATUS", "false").lower() == "true":
   285            self.logger.info("WILL update Mapping status")
   286            ksclass = KubeStatus
   287        else:
   288            self.logger.info("WILL NOT update Mapping status")
   289
   290        self.kubestatus = ksclass(self)
   291
   292        self.config_path = config_path
   293        self.bootstrap_path = bootstrap_path
   294        self.ads_path = ads_path
   295        self.snapshot_path = snapshot_path
   296        self.clustermap_path = clustermap_path or os.path.join(
   297            os.path.dirname(self.bootstrap_path), "clustermap.json"
   298        )
   299
   300        # You must hold config_lock when updating config elements (including diag!).
   301        self.config_lock = threading.Lock()
   302
   303        # When generating new diagnostics, there's a dance around config_lock and
   304        # diag_lock -- see the diag() property.
   305        self.diag_lock = threading.Lock()
   306
   307        # Why are we doing this? Aren't we sure we're singlethreaded here?
   308        # Well, yes. But self.diag is actually a property, and it will raise an
   309        # assertion failure if we're not holding self.config_lock... and once
   310        # the lock is in play at all, we're gonna time it, in case my belief
   311        # that grabbing the lock here is always effectively free turns out to
   312        # be wrong.
   313
   314        with self.config_lock:
   315            self.ir = None  # don't update unless you hold config_lock
   316            self.econf = None  # don't update unless you hold config_lock
   317            self.diag = None  # don't update unless you hold config_lock
   318
   319        self.stats_updater = None
   320        self.scout_checker = None
   321
   322        self.last_request_info = {}
   323        self.last_request_time = None
   324
   325        # self.scout = Scout(update_frequency=datetime.timedelta(seconds=10))
   326        self.scout = Scout(local_only=self.local_scout)
   327
   328        ProcessCollector(namespace="ambassador", registry=self.metrics_registry)
   329        metrics_info = Info(
   330            name="diagnostics",
   331            namespace="ambassador",
   332            documentation="Ambassador diagnostic info",
   333            registry=self.metrics_registry,
   334        )
   335        metrics_info.info(
   336            {
   337                "version": __version__,
   338                "ambassador_id": Config.ambassador_id,
   339                "cluster_id": os.environ.get(
   340                    "AMBASSADOR_CLUSTER_ID",
   341                    os.environ.get("AMBASSADOR_SCOUT_ID", "00000000-0000-0000-0000-000000000000"),
   342                ),
   343                "single_namespace": str(Config.single_namespace),
   344            }
   345        )
   346
   347    @property
   348    def diag(self) -> Optional[Diagnostics]:
   349        """
   350        It turns out to be expensive to generate the Diagnostics class, so
   351        app.diag is a property that does it on demand, handling Timers and
   352        the config lock for you.
   353
   354        You MUST NOT already hold the config_lock or the diag_lock when
   355        trying to read app.diag.
   356
   357        You MUST already have loaded an IR.
   358        """
   359
   360        # The config_lock is meant to make sure that we don't ever update
   361        # self.diag in two places at once, so grab that first.
   362        with self.config_lock:
   363            # If we've already generated diagnostics...
   364            if app._diag:
   365                # ...then we're good to go.
   366                return app._diag
   367
   368        # If here, we have _not_ generated diagnostics, and we've dropped the
   369        # config lock so as not to block anyone else. Next up: grab the diag
   370        # lock, because we'd rather not have two diag generations happening at
   371        # once.
   372        with self.diag_lock:
   373            # Did someone else sneak in between releasing the config lock and
   374            # grabbing the diag lock?
   375            if app._diag:
   376                # Yup. Use their work.
   377                return app._diag
   378
   379            # OK, go generate diagnostics.
   380            _diag = self._generate_diagnostics()
   381
   382            # If that didn't work, no point in messing with the config lock.
   383            if not _diag:
   384                return None
   385
   386            # Once here, we need to - once again - grab the config lock to update
   387            # app._diag. This is safe because this is the only place we ever mess
   388            # with the diag lock, so nowhere else will try to grab the diag lock
   389            # while holding the config lock.
   390            with app.config_lock:
   391                app._diag = _diag
   392
   393            # Finally, we can return app._diag to our caller.
   394            return app._diag
   395
   396    @diag.setter
   397    def diag(self, diag: Optional[Diagnostics]) -> None:
   398        """
   399        It turns out to be expensive to generate the Diagnostics class, so
   400        app.diag is a property that does it on demand, handling Timers and
   401        the config lock for you.
   402
   403        You MUST already hold the config_lock when trying to update app.diag.
   404        You MUST NOT hold the diag_lock.
   405        """
   406        self._diag = diag
   407
   408    def _generate_diagnostics(self) -> Optional[Diagnostics]:
   409        """
   410        Do the heavy lifting of generating Diagnostics for our current configuration.
   411        Really, only the diag() property should be calling this method, but if you
   412        are convinced that you need to call it from elsewhere:
   413
   414        1. You're almost certainly wrong about needing to call it from elsewhere.
   415        2. You MUST hold the diag_lock when calling this method.
   416        3. You MUST NOT hold the config_lock when calling this method.
   417        4. No, really, you're wrong. Don't call this method from anywhere but the
   418           diag() property.
   419        """
   420
   421        # Make sure we have an IR and econf to work with.
   422        if not app.ir or not app.econf:
   423            # Nope, bail.
   424            return None
   425
   426        # OK, go ahead and generate diagnostics. Use the diag_timer to time
   427        # this.
   428        with self.diag_timer:
   429            _diag = Diagnostics(app.ir, app.econf)
   430
   431            # Update some metrics data points given the new generated Diagnostics
   432            diag_dict = _diag.as_dict()
   433            self.diag_errors.set(len(diag_dict.get("errors", [])))
   434            self.diag_notices.set(len(diag_dict.get("notices", [])))
   435
   436            # Note that we've updated diagnostics, since that might trigger a
   437            # timer log.
   438            self.reconf_stats.mark("diag")
   439
   440            return _diag
   441
   442    def check_scout(self, what: str) -> None:
   443        self.watcher.post("SCOUT", (what, self.ir))
   444
   445    def post_timer_event(self) -> None:
   446        # Post an event to do a timer check.
   447        self.watcher.post("TIMER", None)
   448
   449    def check_timers(self) -> None:
   450        # Actually do the timer check.
   451
   452        if self.reconf_stats.needs_timers():
   453            # OK! Log the timers...
   454
   455            for t in [
   456                self.config_timer,
   457                self.fetcher_timer,
   458                self.aconf_timer,
   459                self.ir_timer,
   460                self.econf_timer,
   461                self.diag_timer,
   462            ]:
   463                if t:
   464                    self.logger.info(t.summary())
   465
   466            # ...and the cache statistics, if we can.
   467            if self.cache:
   468                self.cache.dump_stats()
   469
   470            # Always dump the reconfiguration stats...
   471            self.reconf_stats.dump()
   472
   473            # ...and mark that the timers have been logged.
   474            self.reconf_stats.mark_timers_logged()
   475
   476        # In this case we need to check to see if it's time to do a configuration
   477        # check, too.
   478        if self.reconf_stats.needs_check():
   479            result = False
   480
   481            try:
   482                result = self.check_cache()
   483            except Exception as e:
   484                tb = "\n".join(traceback.format_exception(*sys.exc_info()))
   485                self.logger.error("CACHE: CHECK FAILED: %s\n%s" % (e, tb))
   486
   487            # Mark that the check has happened.
   488            self.reconf_stats.mark_checked(result)
   489
   490    @staticmethod
   491    def json_diff(what: str, j1: str, j2: str) -> str:
   492        output = ""
   493
   494        l1 = j1.split("\n")
   495        l2 = j2.split("\n")
   496
   497        n1 = f"{what} incremental"
   498        n2 = f"{what} nonincremental"
   499
   500        output += "\n--------\n"
   501
   502        for line in difflib.context_diff(l1, l2, fromfile=n1, tofile=n2):
   503            line = line.rstrip()
   504            output += line
   505            output += "\n"
   506
   507        return output
   508
   509    def check_cache(self) -> bool:
   510        # We're going to build a shiny new IR and econf from our existing aconf, and make
   511        # sure everything matches. We will _not_ use the existing cache for this.
   512        #
   513        # For this, make sure we have an IR already...
   514        assert self.aconf
   515        assert self.ir
   516        assert self.econf
   517
   518        # Compute this IR/econf with a new empty cache. It saves a lot of trouble with
   519        # having to delete cache keys from the JSON.
   520
   521        self.logger.debug("CACHE: starting check")
   522        cache = Cache(self.logger)
   523        scc = SecretHandler(app.logger, "check_cache", app.snapshot_path, "check")
   524        ir = IR(self.aconf, secret_handler=scc, cache=cache)
   525        econf = EnvoyConfig.generate(ir, Config.envoy_api_version, cache=cache)
   526
   527        # This is testing code.
   528        # name = list(ir.clusters.keys())[0]
   529        # del(ir.clusters[name])
   530
   531        i1 = self.ir.as_json()
   532        i2 = ir.as_json()
   533
   534        e1 = self.econf.as_json()
   535        e2 = econf.as_json()
   536
   537        result = True
   538        errors = ""
   539
   540        if i1 != i2:
   541            result = False
   542            self.logger.error("CACHE: IR MISMATCH")
   543            errors += "IR diffs:\n"
   544            errors += self.json_diff("IR", i1, i2)
   545
   546        if e1 != e2:
   547            result = False
   548            self.logger.error("CACHE: ENVOY CONFIG MISMATCH")
   549            errors += "econf diffs:\n"
   550            errors += self.json_diff("econf", i1, i2)
   551
   552        if not result:
   553            err_path = os.path.join(self.snapshot_path, "diff-tmp.txt")
   554
   555            open(err_path, "w").write(errors)
   556
   557            snapcount = int(os.environ.get("AMBASSADOR_SNAPSHOT_COUNT", "4"))
   558            snaplist: List[Tuple[str, str]] = []
   559
   560            if snapcount > 0:
   561                # If snapcount is 4, this range statement becomes range(-4, -1)
   562                # which gives [ -4, -3, -2 ], which the list comprehension turns
   563                # into [ ( "-3", "-4" ), ( "-2", "-3" ), ( "-1", "-2" ) ]...
   564                # which is the list of suffixes to rename to rotate the snapshots.
   565
   566                snaplist += [(str(x + 1), str(x)) for x in range(-1 * snapcount, -1)]
   567
   568                # After dealing with that, we need to rotate the current file into -1.
   569                snaplist.append(("", "-1"))
   570
   571            # Whether or not we do any rotation, we need to cycle in the '-tmp' file.
   572            snaplist.append(("-tmp", ""))
   573
   574            for from_suffix, to_suffix in snaplist:
   575                from_path = os.path.join(app.snapshot_path, "diff{}.txt".format(from_suffix))
   576                to_path = os.path.join(app.snapshot_path, "diff{}.txt".format(to_suffix))
   577
   578                try:
   579                    self.logger.debug("rotate: %s -> %s" % (from_path, to_path))
   580                    os.rename(from_path, to_path)
   581                except IOError as e:
   582                    self.logger.debug("skip %s -> %s: %s" % (from_path, to_path, e))
   583                except Exception as e:
   584                    self.logger.debug("could not rename %s -> %s: %s" % (from_path, to_path, e))
   585
   586        self.logger.info("CACHE: check %s" % ("succeeded" if result else "failed"))
   587
   588        return result
   589
   590
   591# get the "templates" directory, or raise "FileNotFoundError" if not found
   592def get_templates_dir():
   593    res_dir = None
   594    try:
   595        # this will fail when not in a distribution
   596        res_dir = resource_filename(Requirement.parse("ambassador"), "templates")
   597    except:
   598        pass
   599
   600    maybe_dirs = [res_dir, os.path.join(os.path.dirname(__file__), "..", "templates")]
   601    for d in maybe_dirs:
   602        if d and os.path.isdir(d):
   603            return d
   604    raise FileNotFoundError
   605
   606
   607# Get the Flask app defined early. Setup happens later.
   608app = DiagApp(__name__, template_folder=get_templates_dir())
   609
   610
   611######## DECORATORS
   612
   613
   614def standard_handler(f):
   615    func_name = getattr(f, "__name__", "<anonymous>")
   616
   617    @functools.wraps(f)
   618    def wrapper(*args, **kwds):
   619        reqid = str(uuid.uuid4()).upper()
   620        prefix = '%s: %s "%s %s"' % (reqid, request.remote_addr, request.method, request.path)
   621
   622        app.logger.debug("%s START" % prefix)
   623
   624        start = datetime.datetime.now()
   625
   626        app.logger.debug("%s handler %s" % (prefix, func_name))
   627
   628        # Getting elements in the `tvars_cache` will make sure eviction happens on `max_age_seconds` TTL
   629        # for removed patch_client rather than waiting to fill `max_len`.
   630        # Looping over a copied list of keys, to prevent mutating tvars_cache while iterating.
   631        for k in list(tvars_cache.keys()):
   632            tvars_cache.get(k)
   633
   634        # Default to the exception case
   635        result_to_log = "server error"
   636        status_to_log = 500
   637        result_log_level = logging.ERROR
   638        result = Response(result_to_log, status_to_log)
   639
   640        try:
   641            result = f(*args, reqid=reqid, **kwds)
   642            if not isinstance(result, Response):
   643                result = Response(f"Invalid handler result {result}", status_to_log)
   644
   645            status_to_log = result.status_code
   646
   647            if (status_to_log // 100) == 2:
   648                result_log_level = logging.INFO
   649                result_to_log = "success"
   650            else:
   651                result_log_level = logging.ERROR
   652                result_to_log = "failure"
   653        except Exception as e:
   654            app.logger.exception(e)
   655
   656        end = datetime.datetime.now()
   657        ms = int(((end - start).total_seconds() * 1000) + 0.5)
   658
   659        app.logger.log(
   660            result_log_level, "%s %dms %d %s" % (prefix, ms, status_to_log, result_to_log)
   661        )
   662
   663        return result
   664
   665    return wrapper
   666
   667
   668def internal_handler(f):
   669    """
   670    Reject requests where the remote address is not localhost. See the docstring
   671    for _is_local_request() for important caveats!
   672    """
   673    func_name = getattr(f, "__name__", "<anonymous>")
   674
   675    @functools.wraps(f)
   676    def wrapper(*args, **kwds):
   677        if not _is_local_request():
   678            return "Forbidden\n", 403
   679        return f(*args, **kwds)
   680
   681    return wrapper
   682
   683
   684######## UTILITIES
   685
   686
   687def _is_local_request() -> bool:
   688    """
   689    Determine if this request originated with localhost.
   690
   691    We rely on healthcheck_server.go setting the X-Ambassador-Diag-IP header for us
   692    (and we rely on it overwriting anything that's already there!).
   693
   694    It might be possible to consider the environment variables SERVER_NAME and
   695    SERVER_PORT instead, as those are allegedly required by WSGI... but attempting
   696    to do so in Flask/GUnicorn yielded a worse implementation that was still not
   697    portable.
   698
   699    """
   700
   701    remote_addr: Optional[str] = ""
   702
   703    remote_addr = request.headers.get("X-Ambassador-Diag-IP")
   704
   705    return remote_addr == "127.0.0.1"
   706
   707
   708def _allow_diag_ui() -> bool:
   709    """
   710    Helper function to check if diag ui traffic is allowed or not
   711    based on the different flags from the config:
   712    * diagnostics.enabled: Enable to diag UI by adding mappings
   713    * diagnostics.allow_non_local: Allow non local traffic
   714                                   even when diagnotics UI is disabled.
   715                                   Mappings are not added for the diag UI
   716                                   but the diagnotics UI is still exposed for
   717                                   the pod IP in the admin port.
   718    * local traffic or not: When diagnotics disagled and allow_non_local is false,
   719                            allow traffic only from localhost clients
   720    """
   721    enabled = False
   722    allow_non_local = False
   723    ir = app.ir
   724    if ir:
   725        enabled = ir.ambassador_module.diagnostics.get("enabled", False)
   726        allow_non_local = ir.ambassador_module.diagnostics.get("allow_non_local", False)
   727    if not enabled and not _is_local_request() and not allow_non_local:
   728        return False
   729    return True
   730
   731
   732class Notices:
   733    def __init__(self, local_config_path: str) -> None:
   734        self.local_path = local_config_path
   735        self.notices: List[Dict[str, str]] = []
   736
   737    def reset(self):
   738        local_notices: List[Dict[str, str]] = []
   739        local_data = ""
   740
   741        try:
   742            local_stream = open(self.local_path, "r")
   743            local_data = local_stream.read()
   744            local_notices = parse_json(local_data)
   745        except OSError:
   746            pass
   747        except:
   748            local_notices.append(
   749                {"level": "ERROR", "message": "bad local notices: %s" % local_data}
   750            )
   751
   752        self.notices = local_notices
   753        # app.logger.info("Notices: after RESET: %s" % dump_json(self.notices))
   754
   755    def post(self, notice):
   756        # app.logger.debug("Notices: POST %s" % notice)
   757        self.notices.append(notice)
   758        # app.logger.info("Notices: after POST: %s" % dump_json(self.notices))
   759
   760    def prepend(self, notice):
   761        # app.logger.debug("Notices: PREPEND %s" % notice)
   762        self.notices.insert(0, notice)
   763        # app.logger.info("Notices: after PREPEND: %s" % dump_json(self.notices))
   764
   765    def extend(self, notices):
   766        for notice in notices:
   767            self.post(notice)
   768
   769
   770def td_format(td_object):
   771    seconds = int(td_object.total_seconds())
   772    periods = [
   773        ("year", 60 * 60 * 24 * 365),
   774        ("month", 60 * 60 * 24 * 30),
   775        ("day", 60 * 60 * 24),
   776        ("hour", 60 * 60),
   777        ("minute", 60),
   778        ("second", 1),
   779    ]
   780
   781    strings = []
   782    for period_name, period_seconds in periods:
   783        if seconds > period_seconds:
   784            period_value, seconds = divmod(seconds, period_seconds)
   785
   786            strings.append(
   787                "%d %s%s" % (period_value, period_name, "" if (period_value == 1) else "s")
   788            )
   789
   790    formatted = ", ".join(strings)
   791
   792    if not formatted:
   793        formatted = "0s"
   794
   795    return formatted
   796
   797
   798def interval_format(seconds, normal_format, now_message):
   799    if seconds >= 1:
   800        return normal_format % td_format(datetime.timedelta(seconds=seconds))
   801    else:
   802        return now_message
   803
   804
   805def system_info(app):
   806    ir = app.ir
   807    debug_mode = False
   808
   809    if ir:
   810        amod = ir.ambassador_module
   811        debug_mode = amod.get("debug_mode", False)
   812
   813        app.logger.debug(f"DEBUG_MODE {debug_mode}")
   814
   815    status_dict = {"config failure": [False, "no configuration loaded"]}
   816
   817    env_status = getattr(app.watcher, "env_status", None)
   818
   819    if env_status:
   820        status_dict = env_status.to_dict()
   821        app.logger.debug(f"status_dict {status_dict}")
   822
   823    return {
   824        "version": __version__,
   825        "hostname": SystemInfo.MyHostName,
   826        "ambassador_id": Config.ambassador_id,
   827        "ambassador_namespace": Config.ambassador_namespace,
   828        "single_namespace": Config.single_namespace,
   829        "knative_enabled": os.environ.get("AMBASSADOR_KNATIVE_SUPPORT", "").lower() == "true",
   830        "statsd_enabled": os.environ.get("STATSD_ENABLED", "").lower() == "true",
   831        "endpoints_enabled": Config.enable_endpoints,
   832        "cluster_id": os.environ.get(
   833            "AMBASSADOR_CLUSTER_ID",
   834            os.environ.get("AMBASSADOR_SCOUT_ID", "00000000-0000-0000-0000-000000000000"),
   835        ),
   836        "boot_time": boot_time,
   837        "hr_uptime": td_format(datetime.datetime.now() - boot_time),
   838        "latest_snapshot": app.latest_snapshot,
   839        "env_good": getattr(app.watcher, "env_good", False),
   840        "env_failures": getattr(app.watcher, "failure_list", ["no IR loaded"]),
   841        "env_status": status_dict,
   842        "debug_mode": debug_mode,
   843    }
   844
   845
   846def envoy_status(estats: EnvoyStats):
   847    since_boot = interval_format(estats.time_since_boot(), "%s", "less than a second")
   848
   849    since_update = "Never updated"
   850
   851    if estats.time_since_update():
   852        since_update = interval_format(
   853            estats.time_since_update(), "%s ago", "within the last second"
   854        )
   855
   856    return {
   857        "alive": estats.is_alive(),
   858        "ready": estats.is_ready(),
   859        "uptime": since_boot,
   860        "since_update": since_update,
   861    }
   862
   863
   864def drop_serializer_key(d: Dict[Any, Any]) -> Dict[Any, Any]:
   865    """
   866    Delete the "serialization" key (if present) in any dictionary passed in and
   867    return that dictionary. This function is intended to be used as the
   868    object_hook value for json.load[s].
   869    """
   870    _ = d.pop("serialization", None)
   871    return d
   872
   873
   874def filter_keys(d: Dict[Any, Any], keys_to_keep):
   875    unwanted_keys = set(d) - set(keys_to_keep)
   876    for unwanted_key in unwanted_keys:
   877        del d[unwanted_key]
   878
   879
   880def filter_webui(d: Dict[Any, Any]):
   881    filter_keys(
   882        d,
   883        [
   884            "system",
   885            "route_info",
   886            "source_map",
   887            "ambassador_resolvers",
   888            "ambassador_services",
   889            "envoy_status",
   890            "cluster_stats",
   891            "loginfo",
   892            "errors",
   893        ],
   894    )
   895    for ambassador_resolver in d["ambassador_resolvers"]:
   896        filter_keys(ambassador_resolver, ["_source", "kind"])
   897    for route_info in d["route_info"]:
   898        filter_keys(route_info, ["diag_class", "key", "headers", "precedence", "clusters"])
   899        for cluster in route_info["clusters"]:
   900            filter_keys(cluster, ["_hcolor", "type_label", "service", "weight"])
   901
   902
   903@app.route("/_internal/v0/ping", methods=["GET"])
   904@internal_handler
   905def handle_ping():
   906    return "ACK\n", 200
   907
   908
   909@app.route("/_internal/v0/features", methods=["GET"])
   910@internal_handler
   911def handle_features():
   912    # If we don't have an IR yet, do nothing.
   913    #
   914    # We don't bother grabbing the config_lock here because we're not changing
   915    # anything, and an features request hitting at exactly the same moment as
   916    # the first configure is a race anyway. If it fails, that's not a big deal,
   917    # they can try again.
   918    if not app.ir:
   919        app.logger.debug("Features: configuration required first")
   920        return "Can't do features before configuration", 503
   921
   922    return jsonify(app.ir.features()), 200
   923
   924
   925@app.route("/_internal/v0/watt", methods=["POST"])
   926@internal_handler
   927def handle_watt_update():
   928    url = request.args.get("url", None)
   929
   930    if not url:
   931        app.logger.error("error: watt update requested with no URL")
   932        return "error: watt update requested with no URL\n", 400
   933
   934    app.logger.debug("Update requested: watt, %s" % url)
   935
   936    status, info = app.watcher.post("CONFIG", ("watt", url))
   937
   938    return info, status
   939
   940
   941@app.route("/_internal/v0/fs", methods=["POST"])
   942@internal_handler
   943def handle_fs():
   944    path = request.args.get("path", None)
   945
   946    if not path:
   947        app.logger.error("error: update requested with no PATH")
   948        return "error: update requested with no PATH\n", 400
   949
   950    app.logger.debug("Update requested from %s" % path)
   951
   952    status, info = app.watcher.post("CONFIG_FS", path)
   953
   954    return info, status
   955
   956
   957@app.route("/_internal/v0/events", methods=["GET"])
   958@internal_handler
   959def handle_events():
   960    if not app.local_scout:
   961        return "Local Scout is not enabled\n", 400
   962
   963    event_dump = [
   964        (x["local_scout_timestamp"], x["mode"], x["action"], x) for x in app.scout._scout.events
   965    ]
   966
   967    app.logger.debug(f"Event dump {event_dump}")
   968
   969    return jsonify(event_dump)
   970
   971
   972@app.route("/ambassador/v0/favicon.ico", methods=["GET"])
   973def favicon():
   974    template_path = resource_filename(Requirement.parse("ambassador"), "templates")
   975
   976    return send_from_directory(template_path, "favicon.ico")
   977
   978
   979@app.route("/ambassador/v0/check_alive", methods=["GET"])
   980def check_alive():
   981    status = envoy_status(app.estatsmgr.get_stats())
   982
   983    if status["alive"]:
   984        return "ambassador liveness check OK (%s)\n" % status["uptime"], 200
   985    else:
   986        return "ambassador seems to have died (%s)\n" % status["uptime"], 503
   987
   988
   989@app.route("/ambassador/v0/check_ready", methods=["GET"])
   990def check_ready():
   991    if not app.ir:
   992        return "ambassador waiting for config\n", 503
   993
   994    status = envoy_status(app.estatsmgr.get_stats())
   995
   996    if status["ready"]:
   997        return "ambassador readiness check OK (%s)\n" % status["since_update"], 200
   998    else:
   999        return "ambassador not ready (%s)\n" % status["since_update"], 503
  1000
  1001
  1002@app.route("/ambassador/v0/diag/", methods=["GET"])
  1003@standard_handler
  1004def show_overview(reqid=None):
  1005    # If we don't have an IR yet, do nothing.
  1006    #
  1007    # We don't bother grabbing the config_lock here because we're not changing
  1008    # anything, and an overview request hitting at exactly the same moment as
  1009    # the first configure is a race anyway. If it fails, that's not a big deal,
  1010    # they can try again.
  1011    if not app.ir:
  1012        app.logger.debug("OV %s - can't do overview before configuration" % reqid)
  1013        return "Can't do overview before configuration", 503
  1014
  1015    if not _allow_diag_ui():
  1016        return Response("Not found\n", 404)
  1017
  1018    app.logger.debug("OV %s - showing overview" % reqid)
  1019
  1020    # Remember that app.diag is a property that can involve some real expense
  1021    # to compute -- we don't want to call it more than once here, so we cache
  1022    # its value.
  1023    diag = app.diag
  1024
  1025    if app.verbose:
  1026        app.logger.debug("OV %s: DIAG" % reqid)
  1027        app.logger.debug("%s" % dump_json(diag.as_dict(), pretty=True))
  1028
  1029    estats = app.estatsmgr.get_stats()
  1030    ov = diag.overview(request, estats)
  1031
  1032    if app.verbose:
  1033        app.logger.debug("OV %s: OV" % reqid)
  1034        app.logger.debug("%s" % dump_json(ov, pretty=True))
  1035        app.logger.debug("OV %s: collecting errors" % reqid)
  1036
  1037    ddict = collect_errors_and_notices(request, reqid, "overview", diag)
  1038
  1039    banner_content = None
  1040    if app.banner_endpoint and app.ir and app.ir.edge_stack_allowed:
  1041        try:
  1042            response = requests.get(app.banner_endpoint)
  1043            if response.status_code == 200:
  1044                banner_content = response.text
  1045        except Exception as e:
  1046            app.logger.error("could not get banner_content: %s" % e)
  1047
  1048    tvars = dict(
  1049        system=system_info(app),
  1050        envoy_status=envoy_status(estats),
  1051        loginfo=app.estatsmgr.loginfo,
  1052        notices=app.notices.notices,
  1053        banner_content=banner_content,
  1054        **ov,
  1055        **ddict,
  1056    )
  1057
  1058    patch_client = request.args.get("patch_client", None)
  1059    if request.args.get("json", None):
  1060        filter_key = request.args.get("filter", None)
  1061
  1062        if filter_key == "webui":
  1063            filter_webui(tvars)
  1064        elif filter_key:
  1065            return jsonify(tvars.get(filter_key, None))
  1066
  1067        if patch_client:
  1068            # Assume this is the Admin UI. Recursively drop all "serialization"
  1069            # keys. This avoids leaking secrets and generally makes the
  1070            # snapshot a lot smaller without losing information that the Admin
  1071            # UI cares about. We do this below by setting the object_hook
  1072            # parameter of the json.loads(...) call. We have to use python's
  1073            # json library instead of orjson, because orjson does not support
  1074            # the object_hook feature.
  1075
  1076            # Get the previous full representation
  1077            cached_tvars_json = tvars_cache.get(patch_client, dict())
  1078            # Serialize the tvars into a json-string using the same jsonify Flask serializer, then load the json object
  1079            response_content = json.loads(flask_json.dumps(tvars), object_hook=drop_serializer_key)
  1080            # Diff between the previous representation and the current full representation  (http://jsonpatch.com/)
  1081            patch = jsonpatch.make_patch(cached_tvars_json, response_content)
  1082            # Save the current full representation in memory
  1083            tvars_cache[patch_client] = response_content
  1084
  1085            # Return only the diff
  1086            return Response(patch.to_string(), mimetype="application/json")
  1087        else:
  1088            return jsonify(tvars)
  1089    else:
  1090        app.check_scout("overview")
  1091        return Response(render_template("overview.html", **tvars))
  1092
  1093
  1094def collect_errors_and_notices(request, reqid, what: str, diag: Diagnostics) -> Dict:
  1095    loglevel = request.args.get("loglevel", None)
  1096    notice = None
  1097
  1098    if loglevel:
  1099        app.logger.debug("%s %s -- requesting loglevel %s" % (what, reqid, loglevel))
  1100        app.diag_log_level.labels("debug").set(1 if loglevel == "debug" else 0)
  1101
  1102        if not app.estatsmgr.update_log_levels(time.time(), level=loglevel):
  1103            notice = {"level": "WARNING", "message": "Could not update log level!"}
  1104        # else:
  1105        #     return redirect("/ambassador/v0/diag/", code=302)
  1106
  1107    # We need to grab errors and notices from diag.as_dict(), process the errors so
  1108    # they work for the HTML rendering, and post the notices to app.notices. Then we
  1109    # return the dict representation that our caller should work with.
  1110
  1111    ddict = diag.as_dict()
  1112
  1113    # app.logger.debug("ddict %s" % dump_json(ddict, pretty=True))
  1114
  1115    derrors = ddict.pop("errors", {})
  1116
  1117    errors = []
  1118
  1119    for err_key, err_list in derrors.items():
  1120        if err_key == "-global-":
  1121            err_key = ""
  1122
  1123        for err in err_list:
  1124            errors.append((err_key, err["error"]))
  1125
  1126    dnotices = ddict.pop("notices", {})
  1127
  1128    # Make sure that anything about the loglevel gets folded into this set.
  1129    if notice:
  1130        app.notices.prepend(notice)
  1131
  1132    for notice_key, notice_list in dnotices.items():
  1133        for notice in notice_list:
  1134            app.notices.post({"level": "NOTICE", "message": "%s: %s" % (notice_key, notice)})
  1135
  1136    ddict["errors"] = errors
  1137
  1138    return ddict
  1139
  1140
  1141@app.route("/ambassador/v0/diag/<path:source>", methods=["GET"])
  1142@standard_handler
  1143def show_intermediate(source=None, reqid=None):
  1144    # If we don't have an IR yet, do nothing.
  1145    #
  1146    # We don't bother grabbing the config_lock here because we're not changing
  1147    # anything, and an overview request hitting at exactly the same moment as
  1148    # the first configure is a race anyway. If it fails, that's not a big deal,
  1149    # they can try again.
  1150    if not app.ir:
  1151        app.logger.debug(
  1152            "SRC %s - can't do intermediate for %s before configuration" % (reqid, source)
  1153        )
  1154        return "Can't do overview before configuration", 503
  1155
  1156    if not _allow_diag_ui():
  1157        return Response("Not found\n", 404)
  1158
  1159    app.logger.debug("SRC %s - getting intermediate for '%s'" % (reqid, source))
  1160
  1161    # Remember that app.diag is a property that can involve some real expense
  1162    # to compute -- we don't want to call it more than once here, so we cache
  1163    # its value.
  1164    diag = app.diag
  1165
  1166    method = request.args.get("method", None)
  1167    resource = request.args.get("resource", None)
  1168
  1169    estats = app.estatsmgr.get_stats()
  1170    result = diag.lookup(request, source, estats)
  1171
  1172    if app.verbose:
  1173        app.logger.debug("RESULT %s" % dump_json(result, pretty=True))
  1174
  1175    ddict = collect_errors_and_notices(request, reqid, "detail %s" % source, diag)
  1176
  1177    tvars = dict(
  1178        system=system_info(app),
  1179        envoy_status=envoy_status(estats),
  1180        loginfo=app.estatsmgr.loginfo,
  1181        notices=app.notices.notices,
  1182        method=method,
  1183        resource=resource,
  1184        **result,
  1185        **ddict,
  1186    )
  1187
  1188    if request.args.get("json", None):
  1189        key = request.args.get("filter", None)
  1190
  1191        if key:
  1192            return jsonify(tvars.get(key, None))
  1193        else:
  1194            return jsonify(tvars)
  1195    else:
  1196        app.check_scout("detail: %s" % source)
  1197        return Response(render_template("diag.html", **tvars))
  1198
  1199
  1200@app.template_filter("sort_by_key")
  1201def sort_by_key(objects):
  1202    return sorted(objects, key=lambda x: x["key"])
  1203
  1204
  1205@app.template_filter("pretty_json")
  1206def pretty_json(obj):
  1207    if isinstance(obj, dict):
  1208        obj = dict(**obj)
  1209
  1210        keys_to_drop = [key for key in obj.keys() if key.startswith("_")]
  1211
  1212        for key in keys_to_drop:
  1213            del obj[key]
  1214
  1215    return dump_json(obj, pretty=True)
  1216
  1217
  1218@app.template_filter("sort_clusters_by_service")
  1219def sort_clusters_by_service(clusters):
  1220    return sorted(clusters, key=lambda x: x["service"])
  1221    # return sorted([ c for c in clusters.values() ], key=lambda x: x['service'])
  1222
  1223
  1224@app.template_filter("source_lookup")
  1225def source_lookup(name, sources):
  1226    app.logger.debug("%s => sources %s" % (name, sources))
  1227
  1228    source = sources.get(name, {})
  1229
  1230    app.logger.debug("%s => source %s" % (name, source))
  1231
  1232    return source.get("_source", name)
  1233
  1234
  1235@app.route("/metrics", methods=["GET"])
  1236@standard_handler
  1237def get_prometheus_metrics(*args, **kwargs):
  1238    # Envoy metrics
  1239    envoy_metrics = app.estatsmgr.get_prometheus_stats()
  1240
  1241    # Ambassador OSS metrics
  1242    ambassador_metrics = generate_latest(registry=app.metrics_registry).decode("utf-8")
  1243
  1244    # Extra metrics endpoint
  1245    extra_metrics_content = ""
  1246    if app.metrics_endpoint and app.ir and app.ir.edge_stack_allowed:
  1247        try:
  1248            response = requests.get(app.metrics_endpoint)
  1249            if response.status_code == 200:
  1250                extra_metrics_content = response.text
  1251        except Exception as e:
  1252            app.logger.error("could not get metrics_endpoint: %s" % e)
  1253
  1254    return Response(
  1255        "".join([envoy_metrics, ambassador_metrics, extra_metrics_content]).encode("utf-8"),
  1256        200,
  1257        mimetype="text/plain",
  1258    )
  1259
  1260
  1261def bool_fmt(b: bool) -> str:
  1262    return "T" if b else "F"
  1263
  1264
  1265class StatusInfo:
  1266    def __init__(self) -> None:
  1267        self.status = True
  1268        self.specifics: List[Tuple[bool, str]] = []
  1269
  1270    def failure(self, message: str) -> None:
  1271        self.status = False
  1272        self.specifics.append((False, message))
  1273
  1274    def OK(self, message: str) -> None:
  1275        self.specifics.append((True, message))
  1276
  1277    def to_dict(self) -> Dict[str, Union[bool, List[Tuple[bool, str]]]]:
  1278        return {"status": self.status, "specifics": self.specifics}
  1279
  1280
  1281class SystemStatus:
  1282    def __init__(self) -> None:
  1283        self.status: Dict[str, StatusInfo] = {}
  1284
  1285    def failure(self, key: str, message: str) -> None:
  1286        self.info_for_key(key).failure(message)
  1287
  1288    def OK(self, key: str, message: str) -> None:
  1289        self.info_for_key(key).OK(message)
  1290
  1291    def info_for_key(self, key) -> StatusInfo:
  1292        if key not in self.status:
  1293            self.status[key] = StatusInfo()
  1294
  1295        return self.status[key]
  1296
  1297    def to_dict(self) -> Dict[str, Dict[str, Union[bool, List[Tuple[bool, str]]]]]:
  1298        return {key: info.to_dict() for key, info in self.status.items()}
  1299
  1300
  1301class KubeStatus:
  1302    pool: concurrent.futures.ProcessPoolExecutor
  1303
  1304    def __init__(self, app) -> None:
  1305        self.app = app
  1306        self.logger = app.logger
  1307        self.live: Dict[str, bool] = {}
  1308        self.current_status: Dict[str, str] = {}
  1309        self.pool = concurrent.futures.ProcessPoolExecutor(max_workers=5)
  1310
  1311    def mark_live(self, kind: str, name: str, namespace: str) -> None:
  1312        key = f"{kind}/{name}.{namespace}"
  1313
  1314        # self.logger.debug(f"KubeStatus MASTER {os.getpid()}: mark_live {key}")
  1315        self.live[key] = True
  1316
  1317    def prune(self) -> None:
  1318        drop: List[str] = []
  1319
  1320        for key in self.current_status.keys():
  1321            if not self.live.get(key, False):
  1322                drop.append(key)
  1323
  1324        for key in drop:
  1325            # self.logger.debug(f"KubeStatus MASTER {os.getpid()}: prune {key}")
  1326            del self.current_status[key]
  1327
  1328        self.live = {}
  1329
  1330    def post(self, kind: str, name: str, namespace: str, text: str) -> None:
  1331        key = f"{kind}/{name}.{namespace}"
  1332        extant = self.current_status.get(key, None)
  1333
  1334        if extant == text:
  1335            # self.logger.info(f"KubeStatus MASTER {os.getpid()}: {key} == {text}")
  1336            pass
  1337        else:
  1338            # self.logger.info(f"KubeStatus MASTER {os.getpid()}: {key} needs {text}")
  1339
  1340            # For now we're going to assume that this works.
  1341            self.current_status[key] = text
  1342            f = self.pool.submit(kubestatus_update, kind, name, namespace, text)
  1343            f.add_done_callback(kubestatus_update_done)
  1344
  1345
  1346# The KubeStatusNoMappings class clobbers the mark_live() method of the
  1347# KubeStatus class, so that updates to Mappings don't actually have any
  1348# effect, but updates to Ingress (for example) do.
  1349class KubeStatusNoMappings(KubeStatus):
  1350    def mark_live(self, kind: str, name: str, namespace: str) -> None:
  1351        pass
  1352
  1353    def post(self, kind: str, name: str, namespace: str, text: str) -> None:
  1354        # There's a path (via IRBaseMapping.check_status) where a Mapping
  1355        # can be added directly to ir.k8s_status_updates, which will come
  1356        # straight here without mark_live being involved -- so short-circuit
  1357        # here for Mappings, too.
  1358
  1359        if kind == "Mapping":
  1360            return
  1361
  1362        super().post(kind, name, namespace, text)
  1363
  1364
  1365def kubestatus_update(kind: str, name: str, namespace: str, text: str) -> str:
  1366    cmd = [
  1367        "kubestatus",
  1368        "--cache-dir",
  1369        "/tmp/client-go-http-cache",
  1370        kind,
  1371        name,
  1372        "-n",
  1373        namespace,
  1374        "-u",
  1375        "/dev/fd/0",
  1376    ]
  1377    # print(f"KubeStatus UPDATE {os.getpid()}: running command: {cmd}")
  1378
  1379    try:
  1380        rc = subprocess.run(
  1381            cmd,
  1382            input=text.encode("utf-8"),
  1383            stdout=subprocess.PIPE,
  1384            stderr=subprocess.STDOUT,
  1385            timeout=5,
  1386        )
  1387        if rc.returncode == 0:
  1388            return f"{name}.{namespace}: update OK"
  1389        else:
  1390            return f"{name}.{namespace}: error {rc.returncode}"
  1391
  1392    except subprocess.TimeoutExpired as e:
  1393        return f"{name}.{namespace}: timed out\n\n{e.output}"
  1394
  1395
  1396def kubestatus_update_done(f: concurrent.futures.Future) -> None:
  1397    # print(f"KubeStatus DONE {os.getpid()}: result {f.result()}")
  1398    pass
  1399
  1400
  1401class AmbassadorEventWatcher(threading.Thread):
  1402    # The key for 'Actions' is chimed - chimed_ok - env_good. This will make more sense
  1403    # if you read through the _load_ir method.
  1404
  1405    Actions = {
  1406        "F-F-F": ("unhealthy", True),  # make sure the first chime always gets out
  1407        "F-F-T": ("now-healthy", True),  # make sure the first chime always gets out
  1408        "F-T-F": ("now-unhealthy", True),  # this is actually impossible
  1409        "F-T-T": ("healthy", True),  # this is actually impossible
  1410        "T-F-F": ("unhealthy", False),
  1411        "T-F-T": ("now-healthy", True),
  1412        "T-T-F": ("now-unhealthy", True),
  1413        "T-T-T": ("update", False),
  1414    }
  1415
  1416    reCompressed = re.compile(r"-\d+$")
  1417
  1418    def __init__(self, app: DiagApp) -> None:
  1419        super().__init__(name="AEW", daemon=True)
  1420        self.app = app
  1421        self.logger = self.app.logger
  1422        self.events: queue.Queue = queue.Queue()
  1423
  1424        self.chimed = False  # Have we ever sent a chime about the environment?
  1425        self.last_chime = False  # What was the status of our last chime? (starts as False)
  1426        self.env_good = False  # Is our environment currently believed to be OK?
  1427        self.failure_list: List[str] = [
  1428            "unhealthy at boot"
  1429        ]  # What's making our environment not OK?
  1430
  1431    def post(
  1432        self, cmd: str, arg: Optional[Union[str, Tuple[str, Optional[IR]]]]
  1433    ) -> Tuple[int, str]:
  1434        rqueue: queue.Queue = queue.Queue()
  1435
  1436        self.events.put((cmd, arg, rqueue))
  1437
  1438        return rqueue.get()
  1439
  1440    def update_estats(self) -> None:
  1441        self.app.estatsmgr.update()
  1442
  1443    def run(self):
  1444        self.logger.info("starting Scout checker and timer logger")
  1445        self.app.scout_checker = PeriodicTrigger(
  1446            lambda: self.check_scout("checkin"), period=86400
  1447        )  # Yup, one day.
  1448        self.app.timer_logger = PeriodicTrigger(self.app.post_timer_event, period=1)
  1449
  1450        self.logger.info("starting event watcher")
  1451
  1452        while True:
  1453            cmd, arg, rqueue = self.events.get()
  1454            # self.logger.info("EVENT: %s" % cmd)
  1455
  1456            if cmd == "CONFIG_FS":
  1457                try:
  1458                    self.load_config_fs(rqueue, arg)
  1459                except Exception as e:
  1460                    self.logger.error("could not reconfigure: %s" % e)
  1461                    self.logger.exception(e)
  1462                    self._respond(rqueue, 500, "configuration from filesystem failed")
  1463            elif cmd == "CONFIG":
  1464                version, url = arg
  1465
  1466                try:
  1467                    if version == "watt":
  1468                        self.load_config_watt(rqueue, url)
  1469                    else:
  1470                        raise RuntimeError("config from %s not supported" % version)
  1471                except Exception as e:
  1472                    self.logger.error("could not reconfigure: %s" % e)
  1473                    self.logger.exception(e)
  1474                    self._respond(rqueue, 500, "configuration failed")
  1475            elif cmd == "SCOUT":
  1476                try:
  1477                    self._respond(rqueue, 200, "checking Scout")
  1478                    self.check_scout(*arg)
  1479                except Exception as e:
  1480                    self.logger.error("could not reconfigure: %s" % e)
  1481                    self.logger.exception(e)
  1482                    self._respond(rqueue, 500, "scout check failed")
  1483            elif cmd == "TIMER":
  1484                try:
  1485                    self._respond(rqueue, 200, "done")
  1486                    self.app.check_timers()
  1487                except Exception as e:
  1488                    self.logger.error("could not check timers? %s" % e)
  1489                    self.logger.exception(e)
  1490            else:
  1491                self.logger.error(f"unknown event type: '{cmd}' '{arg}'")
  1492                self._respond(rqueue, 400, f"unknown event type '{cmd}' '{arg}'")
  1493
  1494    def _respond(self, rqueue: queue.Queue, status: int, info="") -> None:
  1495        # self.logger.debug("responding to query with %s %s" % (status, info))
  1496        rqueue.put((status, info))
  1497
  1498    # load_config_fs reconfigures from the filesystem. It's _mostly_ legacy
  1499    # code, but not entirely, since Docker demo mode still uses it.
  1500    #
  1501    # BE CAREFUL ABOUT STOPPING THE RECONFIGURATION TIMER ONCE IT IS STARTED.
  1502    def load_config_fs(self, rqueue: queue.Queue, path: str) -> None:
  1503        self.logger.debug("loading configuration from disk: %s" % path)
  1504
  1505        # The "path" here can just be a path, but it can also be a command for testing,
  1506        # if the user has chosen to allow that.
  1507
  1508        if self.app.allow_fs_commands and (":" in path):
  1509            pfx, rest = path.split(":", 1)
  1510
  1511            if pfx.lower() == "cmd":
  1512                fields = rest.split(":", 1)
  1513
  1514                cmd = fields[0].upper()
  1515
  1516                args = fields[1:] if (len(fields) > 1) else None
  1517
  1518                if cmd.upper() == "CHIME":
  1519                    self.logger.info("CMD: Chiming")
  1520
  1521                    self.chime()
  1522
  1523                    self._respond(rqueue, 200, "Chimed")
  1524                elif cmd.upper() == "CHIME_RESET":
  1525                    self.chimed = False
  1526                    self.last_chime = False
  1527                    self.env_good = False
  1528
  1529                    self.app.scout.reset_events()
  1530                    self.app.scout.report(mode="boot", action="boot1", no_cache=True)
  1531
  1532                    self.logger.info("CMD: Reset chime state")
  1533                    self._respond(rqueue, 200, "CMD: Reset chime state")
  1534                elif cmd.upper() == "SCOUT_CACHE_RESET":
  1535                    self.app.scout.reset_cache_time()
  1536
  1537                    self.logger.info("CMD: Reset Scout cache time")
  1538                    self._respond(rqueue, 200, "CMD: Reset Scout cache time")
  1539                elif cmd.upper() == "ENV_OK":
  1540                    self.env_good = True
  1541                    self.failure_list = []
  1542
  1543                    self.logger.info("CMD: Marked environment good")
  1544                    self._respond(rqueue, 200, "CMD: Marked environment good")
  1545                elif cmd.upper() == "ENV_BAD":
  1546                    self.env_good = False
  1547                    self.failure_list = ["failure forced"]
  1548
  1549                    self.logger.info("CMD: Marked environment bad")
  1550                    self._respond(rqueue, 200, "CMD: Marked environment bad")
  1551                else:
  1552                    self.logger.info(f'CMD: no such command "{cmd}"')
  1553                    self._respond(rqueue, 400, f'CMD: no such command "{cmd}"')
  1554
  1555                return
  1556            else:
  1557                self.logger.info(f'CONFIG_FS: invalid prefix "{pfx}"')
  1558                self._respond(rqueue, 400, f'CONFIG_FS: invalid prefix "{pfx}"')
  1559
  1560            return
  1561
  1562        # OK, we're starting a reconfiguration. BE CAREFUL TO STOP THE TIMER
  1563        # BEFORE YOU RESPOND TO THE CALLER.
  1564        self.app.config_timer.start()
  1565
  1566        snapshot = re.sub(r"[^A-Za-z0-9_-]", "_", path)
  1567        scc = FSSecretHandler(app.logger, path, app.snapshot_path, "0")
  1568
  1569        with self.app.fetcher_timer:
  1570            aconf = Config()
  1571            fetcher = ResourceFetcher(app.logger, aconf)
  1572            fetcher.load_from_filesystem(path, k8s=app.k8s, recurse=True)
  1573
  1574        if not fetcher.elements:
  1575            self.logger.debug("no configuration resources found at %s" % path)
  1576            # Don't bail from here -- go ahead and reload the IR.
  1577            #
  1578            # XXX This is basically historical logic, honestly. But if you try
  1579            # to respond from here and bail, STOP THE RECONFIGURATION TIMER.
  1580
  1581        self._load_ir(rqueue, aconf, fetcher, scc, snapshot)
  1582
  1583    # load_config_watt reconfigures from the filesystem. It's the one true way of
  1584    # reconfiguring these days.
  1585    #
  1586    # BE CAREFUL ABOUT STOPPING THE RECONFIGURATION TIMER ONCE IT IS STARTED.
  1587    def load_config_watt(self, rqueue: queue.Queue, url: str):
  1588        snapshot = url.split("/")[-1]
  1589        ss_path = os.path.join(app.snapshot_path, "snapshot-tmp.yaml")
  1590
  1591        # OK, we're starting a reconfiguration. BE CAREFUL TO STOP THE TIMER
  1592        # BEFORE YOU RESPOND TO THE CALLER.
  1593        self.app.config_timer.start()
  1594
  1595        self.logger.debug("copying configuration: watt, %s to %s" % (url, ss_path))
  1596
  1597        # Grab the serialization, and save it to disk too.
  1598        serialization = load_url_contents(self.logger, url, stream2=open(ss_path, "w"))
  1599
  1600        if not serialization:
  1601            self.logger.debug("no data loaded from snapshot %s" % snapshot)
  1602            # We never used to return here. I'm not sure if that's really correct?
  1603            #
  1604            # IF YOU CHANGE THIS, BE CAREFUL TO STOP THE RECONFIGURATION TIMER.
  1605
  1606        # Weirdly, we don't need a special WattSecretHandler: parse_watt knows how to handle
  1607        # the secrets that watt sends.
  1608        scc = SecretHandler(app.logger, url, app.snapshot_path, snapshot)
  1609
  1610        # OK. Time the various configuration sections separately.
  1611
  1612        with self.app.fetcher_timer:
  1613            aconf = Config()
  1614            fetcher = ResourceFetcher(app.logger, aconf)
  1615
  1616            if serialization:
  1617                fetcher.parse_watt(serialization)
  1618
  1619        if not fetcher.elements:
  1620            self.logger.debug("no configuration found in snapshot %s" % snapshot)
  1621
  1622            # Don't actually bail here. If they send over a valid config that happens
  1623            # to have nothing for us, it's still a legit config.
  1624            #
  1625            # IF YOU CHANGE THIS, BE CAREFUL TO STOP THE RECONFIGURATION TIMER.
  1626
  1627        self._load_ir(rqueue, aconf, fetcher, scc, snapshot)
  1628
  1629    # _load_ir is where the heavy lifting of a reconfigure happens.
  1630    #
  1631    # AT THE POINT OF ENTRY, THE RECONFIGURATION TIMER IS RUNNING. DO NOT LEAVE
  1632    # THIS METHOD WITHOUT STOPPING THE RECONFIGURATION TIMER.
  1633    def _load_ir(
  1634        self,
  1635        rqueue: queue.Queue,
  1636        aconf: Config,
  1637        fetcher: ResourceFetcher,
  1638        secret_handler: SecretHandler,
  1639        snapshot: str,
  1640    ) -> None:
  1641        with self.app.aconf_timer:
  1642            aconf.load_all(fetcher.sorted())
  1643
  1644            # TODO(Flynn): This is an awful hack. Have aconf.load(fetcher) that does
  1645            # this correctly.
  1646            #
  1647            # I'm not doing this at this moment because aconf.load_all() is called in a
  1648            # lot of places, and I don't want to destablize 2.2.2.
  1649            aconf.load_invalid(fetcher)
  1650
  1651        aconf_path = os.path.join(app.snapshot_path, "aconf-tmp.json")
  1652        open(aconf_path, "w").write(aconf.as_json())
  1653
  1654        # OK. What kind of reconfiguration are we doing?
  1655        config_type, reset_cache, invalidate_groups_for = IR.check_deltas(
  1656            self.logger, fetcher, self.app.cache
  1657        )
  1658
  1659        if reset_cache:
  1660            self.logger.debug("RESETTING CACHE")
  1661            self.app.cache = Cache(self.logger)
  1662
  1663        with self.app.ir_timer:
  1664            ir = IR(
  1665                aconf,
  1666                secret_handler=secret_handler,
  1667                invalidate_groups_for=invalidate_groups_for,
  1668                cache=self.app.cache,
  1669            )
  1670
  1671        ir_path = os.path.join(app.snapshot_path, "ir-tmp.json")
  1672        open(ir_path, "w").write(ir.as_json())
  1673
  1674        with self.app.econf_timer:
  1675            self.logger.debug(
  1676                "generating envoy configuration with api version %s" % Config.envoy_api_version
  1677            )
  1678            econf = EnvoyConfig.generate(ir, Config.envoy_api_version, cache=self.app.cache)
  1679
  1680        # DON'T generate the Diagnostics here, because that turns out to be expensive.
  1681        # Instead, we'll just reset app.diag to None, then generate it on-demand when
  1682        # we need it.
  1683        #
  1684        # DO go ahead and split the Envoy config into its components for later, though.
  1685        bootstrap_config, ads_config, clustermap = econf.split_config()
  1686
  1687        # OK. Assume that the Envoy config is valid...
  1688        econf_is_valid = True
  1689        econf_bad_reason = ""
  1690
  1691        # ...then look for reasons it's not valid.
  1692        if not econf.has_listeners():
  1693            # No listeners == something in the Ambassador config is totally horked.
  1694            # Probably this is the user not defining any Hosts that match
  1695            # the Listeners in the system.
  1696            #
  1697            # As it happens, Envoy is OK running a config with no listeners, and it'll
  1698            # answer on port 8001 for readiness checks, so... log a notice, but run with it.
  1699            self.logger.warning(
  1700                "No active listeners at all; check your Listener and Host configuration"
  1701            )
  1702        elif not self.validate_envoy_config(
  1703            ir, config=ads_config, retries=self.app.validation_retries
  1704        ):
  1705            # Invalid Envoy config probably indicates a bug in Emissary itself. Sigh.
  1706            econf_is_valid = False
  1707            econf_bad_reason = "invalid envoy configuration generated"
  1708
  1709        # OK. Is the config invalid?
  1710        if not econf_is_valid:
  1711            # BZzzt. Don't post this update.
  1712            self.logger.info(
  1713                "no update performed (%s), continuing with current configuration..."
  1714                % econf_bad_reason
  1715            )
  1716
  1717            # Don't use app.check_scout; it will deadlock.
  1718            self.check_scout("attempted bad update")
  1719
  1720            # DO stop the reconfiguration timer before leaving.
  1721            self.app.config_timer.stop()
  1722            self._respond(
  1723                rqueue, 500, "ignoring (%s) in snapshot %s" % (econf_bad_reason, snapshot)
  1724            )
  1725            return
  1726
  1727        snapcount = int(os.environ.get("AMBASSADOR_SNAPSHOT_COUNT", "4"))
  1728        snaplist: List[Tuple[str, str]] = []
  1729
  1730        if snapcount > 0:
  1731            self.logger.debug("rotating snapshots for snapshot %s" % snapshot)
  1732
  1733            # If snapcount is 4, this range statement becomes range(-4, -1)
  1734            # which gives [ -4, -3, -2 ], which the list comprehension turns
  1735            # into [ ( "-3", "-4" ), ( "-2", "-3" ), ( "-1", "-2" ) ]...
  1736            # which is the list of suffixes to rename to rotate the snapshots.
  1737
  1738            snaplist += [(str(x + 1), str(x)) for x in range(-1 * snapcount, -1)]
  1739
  1740            # After dealing with that, we need to rotate the current file into -1.
  1741            snaplist.append(("", "-1"))
  1742
  1743        # Whether or not we do any rotation, we need to cycle in the '-tmp' file.
  1744        snaplist.append(("-tmp", ""))
  1745
  1746        for from_suffix, to_suffix in snaplist:
  1747            for fmt in ["aconf{}.json", "econf{}.json", "ir{}.json", "snapshot{}.yaml"]:
  1748                from_path = os.path.join(app.snapshot_path, fmt.format(from_suffix))
  1749                to_path = os.path.join(app.snapshot_path, fmt.format(to_suffix))
  1750
  1751                # Make sure we don't leave this method on error! The reconfiguration
  1752                # timer is still running, but also, the snapshots are a debugging aid:
  1753                # if we can't rotate them, meh, whatever.
  1754
  1755                try:
  1756                    self.logger.debug("rotate: %s -> %s" % (from_path, to_path))
  1757                    os.rename(from_path, to_path)
  1758                except IOError as e:
  1759                    self.logger.debug("skip %s -> %s: %s" % (from_path, to_path, e))
  1760                except Exception as e:
  1761                    self.logger.debug("could not rename %s -> %s: %s" % (from_path, to_path, e))
  1762
  1763        app.latest_snapshot = snapshot
  1764        self.logger.debug("saving Envoy configuration for snapshot %s" % snapshot)
  1765
  1766        with open(app.bootstrap_path, "w") as output:
  1767            output.write(dump_json(bootstrap_config, pretty=True))
  1768
  1769        with open(app.ads_path, "w") as output:
  1770            output.write(dump_json(ads_config, pretty=True))
  1771
  1772        with open(app.clustermap_path, "w") as output:
  1773            output.write(dump_json(clustermap, pretty=True))
  1774
  1775        with app.config_lock:
  1776            app.aconf = aconf
  1777            app.ir = ir
  1778            app.econf = econf
  1779
  1780            # Force app.diag to None so that it'll be regenerated on-demand.
  1781            app.diag = None
  1782
  1783        # We're finally done with the whole configuration process.
  1784        self.app.config_timer.stop()
  1785
  1786        if app.kick:
  1787            self.logger.debug("running '%s'" % app.kick)
  1788            os.system(app.kick)
  1789        elif app.ambex_pid != 0:
  1790            self.logger.debug("notifying PID %d ambex" % app.ambex_pid)
  1791            os.kill(app.ambex_pid, signal.SIGHUP)
  1792
  1793        # don't worry about TCPMappings yet
  1794        mappings = app.aconf.get_config("mappings")
  1795
  1796        if mappings:
  1797            for mapping_name, mapping in mappings.items():
  1798                app.kubestatus.mark_live(
  1799                    "Mapping", mapping_name, mapping.get("namespace", Config.ambassador_namespace)
  1800                )
  1801
  1802        app.kubestatus.prune()
  1803
  1804        if app.ir.k8s_status_updates:
  1805            update_count = 0
  1806
  1807            for name in app.ir.k8s_status_updates.keys():
  1808                update_count += 1
  1809                # Strip off any namespace in the name.
  1810                resource_name = name.split(".", 1)[0]
  1811                kind, namespace, update = app.ir.k8s_status_updates[name]
  1812                text = dump_json(update)
  1813
  1814                # self.logger.debug(f"K8s status update: {kind} {resource_name}.{namespace}, {text}...")
  1815
  1816                app.kubestatus.post(kind, resource_name, namespace, text)
  1817
  1818        group_count = len(app.ir.groups)
  1819        cluster_count = len(app.ir.clusters)
  1820        listener_count = len(app.ir.listeners)
  1821        service_count = len(app.ir.services)
  1822
  1823        self._respond(
  1824            rqueue, 200, "configuration updated (%s) from snapshot %s" % (config_type, snapshot)
  1825        )
  1826
  1827        self.logger.info(
  1828            "configuration updated (%s) from snapshot %s (S%d L%d G%d C%d)"
  1829            % (config_type, snapshot, service_count, listener_count, group_count, cluster_count)
  1830        )
  1831
  1832        # Remember that we've reconfigured.
  1833        self.app.reconf_stats.mark(config_type)
  1834
  1835        if app.health_checks and not app.stats_updater:
  1836            app.logger.debug("starting Envoy status updater")
  1837            app.stats_updater = PeriodicTrigger(app.watcher.update_estats, period=5)
  1838
  1839        # Check our environment...
  1840        self.check_environment()
  1841
  1842        self.chime()
  1843
  1844    def chime(self):
  1845        # In general, our reports here should be action "update", and they should honor the
  1846        # Scout cache, but we need to tweak that depending on whether we've done this before
  1847        # and on whether the environment looks OK.
  1848
  1849        already_chimed = bool_fmt(self.chimed)
  1850        was_ok = bool_fmt(self.last_chime)
  1851        now_ok = bool_fmt(self.env_good)
  1852
  1853        # Poor man's state machine...
  1854        action_key = f"{already_chimed}-{was_ok}-{now_ok}"
  1855        action, no_cache = AmbassadorEventWatcher.Actions[action_key]
  1856
  1857        self.logger.debug(f"CHIME: {action_key}")
  1858
  1859        chime_args = {"no_cache": no_cache, "failures": self.failure_list}
  1860
  1861        if self.app.report_action_keys:
  1862            chime_args["action_key"] = action_key
  1863
  1864        # Don't use app.check_scout; it will deadlock.
  1865        self.check_scout(action, **chime_args)
  1866
  1867        # Remember that we have now chimed...
  1868        self.chimed = True
  1869
  1870        # ...and remember what we sent for that chime.
  1871        self.last_chime = self.env_good
  1872
  1873    def check_environment(self, ir: Optional[IR] = None) -> None:
  1874        env_good = True
  1875        chime_failures = {}
  1876        env_status = SystemStatus()
  1877
  1878        error_count = 0
  1879        tls_count = 0
  1880        mapping_count = 0
  1881
  1882        if not ir:
  1883            ir = app.ir
  1884
  1885        if not ir:
  1886            chime_failures["no config loaded"] = True
  1887            env_good = False
  1888        else:
  1889            if not ir.aconf:
  1890                chime_failures["completely empty config"] = True
  1891                env_good = False
  1892            else:
  1893                for err_key, err_list in ir.aconf.errors.items():
  1894                    if err_key == "-global-":
  1895                        err_key = ""
  1896
  1897                    for err in err_list:
  1898                        error_count += 1
  1899                        err_text = err["error"]
  1900
  1901                        self.app.logger.info(f"error {err_key} {err_text}")
  1902
  1903                        if err_text.find("CRD") >= 0:
  1904                            if err_text.find("core") >= 0:
  1905                                chime_failures["core CRDs"] = True
  1906                                env_status.failure("CRDs", "Core CRD type definitions are missing")
  1907                            else:
  1908                                chime_failures["other CRDs"] = True
  1909                                env_status.failure(
  1910                                    "CRDs", "Resolver CRD type definitions are missing"
  1911                                )
  1912
  1913                            env_good = False
  1914                        elif err_text.find("TLS") >= 0:
  1915                            chime_failures["TLS errors"] = True
  1916                            env_status.failure("TLS", err_text)
  1917
  1918                            env_good = False
  1919
  1920            for context in ir.tls_contexts:
  1921                if context:
  1922                    tls_count += 1
  1923                    break
  1924
  1925            for group in ir.groups.values():
  1926                for mapping in group.mappings:
  1927                    pfx = mapping.get("prefix", None)
  1928                    name = mapping.get("name", None)
  1929
  1930                    if pfx:
  1931                        if not pfx.startswith("/ambassador/v0") or not name.startswith("internal_"):
  1932                            mapping_count += 1
  1933
  1934        if error_count:
  1935            env_status.failure(
  1936                "Error check",
  1937                f'{error_count} total error{"" if (error_count == 1) else "s"} logged',
  1938            )
  1939            env_good = False
  1940        else:
  1941            env_status.OK("Error check", "No errors logged")
  1942
  1943        if tls_count:
  1944            env_status.OK(
  1945                "TLS", f'{tls_count} TLSContext{" is" if (tls_count == 1) else "s are"} active'
  1946            )
  1947        else:
  1948            chime_failures["no TLS contexts"] = True
  1949            env_status.failure("TLS", "No TLSContexts are active")
  1950
  1951            env_good = False
  1952
  1953        if mapping_count:
  1954            env_status.OK(
  1955                "Mappings",
  1956                f'{mapping_count} Mapping{" is" if (mapping_count == 1) else "s are"} active',
  1957            )
  1958        else:
  1959            chime_failures["no Mappings"] = True
  1960            env_status.failure("Mappings", "No Mappings are active")
  1961            env_good = False
  1962
  1963        failure_list: List[str] = []
  1964
  1965        if not env_good:
  1966            failure_list = list(sorted(chime_failures.keys()))
  1967
  1968        self.env_good = env_good
  1969        self.env_status = env_status
  1970        self.failure_list = failure_list
  1971
  1972    def check_scout(
  1973        self,
  1974        what: str,
  1975        no_cache: Optional[bool] = False,
  1976        ir: Optional[IR] = None,
  1977        failures: Optional[List[str]] = None,
  1978        action_key: Optional[str] = None,
  1979    ) -> None:
  1980        now = datetime.datetime.now()
  1981        uptime = now - boot_time
  1982        hr_uptime = td_format(uptime)
  1983
  1984        if not ir:
  1985            ir = app.ir
  1986
  1987        self.app.notices.reset()
  1988
  1989        scout_args = {"uptime": int(uptime.total_seconds()), "hr_uptime": hr_uptime}
  1990
  1991        if failures:
  1992            scout_args["failures"] = failures
  1993
  1994        if action_key:
  1995            scout_args["action_key"] = action_key
  1996
  1997        if ir:
  1998            self.app.logger.debug("check_scout: we have an IR")
  1999
  2000            if not os.environ.get("AMBASSADOR_DISABLE_FEATURES", None):
  2001                self.app.logger.debug("check_scout: including features")
  2002                feat = ir.features()
  2003
  2004                # Include features about the cache and incremental reconfiguration,
  2005                # too.
  2006
  2007                if self.app.cache is not None:
  2008                    # Fast reconfigure is on. Supply the real info.
  2009                    feat["frc_enabled"] = True
  2010                    feat["frc_cache_hits"] = self.app.cache.hits
  2011                    feat["frc_cache_misses"] = self.app.cache.misses
  2012                    feat["frc_inv_calls"] = self.app.cache.invalidate_calls
  2013                    feat["frc_inv_objects"] = self.app.cache.invalidated_objects
  2014                else:
  2015                    # Fast reconfigure is off.
  2016                    feat["frc_enabled"] = False
  2017
  2018                # Whether the cache is on or off, we can talk about reconfigurations.
  2019                feat["frc_incr_count"] = self.app.reconf_stats.counts["incremental"]
  2020                feat["frc_complete_count"] = self.app.reconf_stats.counts["complete"]
  2021                feat["frc_check_count"] = self.app.reconf_stats.checks
  2022                feat["frc_check_errors"] = self.app.reconf_stats.errors
  2023
  2024                request_data = app.estatsmgr.get_stats().requests
  2025
  2026                if request_data:
  2027                    self.app.logger.debug("check_scout: including requests")
  2028
  2029                    for rkey in request_data.keys():
  2030                        cur = request_data[rkey]
  2031                        prev = app.last_request_info.get(rkey, 0)
  2032                        feat[f"request_{rkey}_count"] = max(cur - prev, 0)
  2033
  2034                    lrt = app.last_request_time or boot_time
  2035                    since_lrt = now - lrt
  2036                    elapsed = since_lrt.total_seconds()
  2037                    hr_elapsed = td_format(since_lrt)
  2038
  2039                    app.last_request_time = now
  2040                    app.last_request_info = request_data
  2041
  2042                    feat["request_elapsed"] = elapsed
  2043                    feat["request_hr_elapsed"] = hr_elapsed
  2044
  2045                scout_args["features"] = feat
  2046
  2047        scout_result = self.app.scout.report(
  2048            mode="diagd", action=what, no_cache=no_cache, **scout_args
  2049        )
  2050        scout_notices = scout_result.pop("notices", [])
  2051
  2052        global_loglevel = self.app.logger.getEffectiveLevel()
  2053
  2054        self.app.logger.debug(f"Scout section: global loglevel {global_loglevel}")
  2055
  2056        for notice in scout_notices:
  2057            notice_level_name = notice.get("level") or "INFO"
  2058            notice_level = logging.getLevelName(notice_level_name)
  2059
  2060            if notice_level >= global_loglevel:
  2061                self.app.logger.debug(f"Scout section: include {notice}")
  2062                self.app.notices.post(notice)
  2063            else:
  2064                self.app.logger.debug(f"Scout section: skip {notice}")
  2065
  2066        self.app.logger.debug("Scout reports %s" % dump_json(scout_result))
  2067        self.app.logger.debug("Scout notices: %s" % dump_json(scout_notices))
  2068        self.app.logger.debug("App notices after scout: %s" % dump_json(app.notices.notices))
  2069
  2070    def validate_envoy_config(self, ir: IR, config, retries) -> bool:
  2071        if self.app.no_envoy:
  2072            self.app.logger.debug("Skipping validation")
  2073            return True
  2074
  2075        # We want to keep the original config untouched
  2076        validation_config = copy.deepcopy(config)
  2077
  2078        # Envoy fails to validate with @type field in envoy config, so removing that
  2079        validation_config.pop("@type")
  2080
  2081        if os.environ.get("AMBASSADOR_DEBUG_CLUSTER_CONFIG", "false").lower() == "true":
  2082            vconf_clusters = validation_config["static_resources"]["clusters"]
  2083
  2084            if len(vconf_clusters) > 10:
  2085                vconf_clusters.append(copy.deepcopy(vconf_clusters[10]))
  2086
  2087            # Check for cluster-name weirdness.
  2088            _v2_clusters = {}
  2089            _problems = []
  2090
  2091            for name in sorted(ir.clusters.keys()):
  2092                if AmbassadorEventWatcher.reCompressed.search(name):
  2093                    _problems.append(f"IR pre-compressed cluster {name}")
  2094
  2095            for cluster in validation_config["static_resources"]["clusters"]:
  2096                name = cluster["name"]
  2097
  2098                if name in _v2_clusters:
  2099                    _problems.append(f"V2 dup cluster {name}")
  2100                _v2_clusters[name] = True
  2101
  2102            if _problems:
  2103                self.logger.error("ENVOY CONFIG PROBLEMS:\n%s", "\n".join(_problems))
  2104                stamp = datetime.datetime.now().isoformat()
  2105
  2106                bad_snapshot = open(
  2107                    os.path.join(app.snapshot_path, "snapshot-tmp.yaml"), "r"
  2108                ).read()
  2109
  2110                cache_dict: Dict[str, Any] = {}
  2111                cache_links: Dict[str, Any] = {}
  2112
  2113                if self.app.cache:
  2114                    for k, c in self.app.cache.cache.items():
  2115                        v: Any = c[0]
  2116
  2117                        if getattr(v, "as_dict", None):
  2118                            v = v.as_dict()
  2119
  2120                        cache_dict[k] = v
  2121
  2122                    cache_links = {k: list(v) for k, v in self.app.cache.links.items()}
  2123
  2124                bad_dict = {
  2125                    "ir": ir.as_dict(),
  2126                    "v2": config,
  2127                    "validation": validation_config,
  2128                    "problems": _problems,
  2129                    "snapshot": bad_snapshot,
  2130                    "cache": cache_dict,
  2131                    "links": cache_links,
  2132                }
  2133
  2134                bad_dict_str = dump_json(bad_dict, pretty=True)
  2135                with open(os.path.join(app.snapshot_path, f"problems-{stamp}.json"), "w") as output:
  2136                    output.write(bad_dict_str)
  2137
  2138        config_json = dump_json(validation_config, pretty=True)
  2139
  2140        econf_validation_path = os.path.join(app.snapshot_path, "econf-tmp.json")
  2141
  2142        with open(econf_validation_path, "w") as output:
  2143            output.write(config_json)
  2144
  2145        command = [
  2146            "envoy",
  2147            "--service-node",
  2148            "test-id",
  2149            "--service-cluster",
  2150            ir.ambassador_nodename,
  2151            "--config-path",
  2152            econf_validation_path,
  2153            "--mode",
  2154            "validate",
  2155        ]
  2156        if Config.envoy_api_version == "V2":
  2157            command.extend(["--bootstrap-version", "2"])
  2158
  2159        v_exit = 0
  2160        v_encoded = "".encode("utf-8")
  2161
  2162        # Try to validate the Envoy config. Short circuit and fall through
  2163        # immediately on concrete success or failure, and retry (up to the
  2164        # limit) on timeout.
  2165        #
  2166        # The default timeout is 5s, but this can be overridden in the Ambassador
  2167        # module.
  2168
  2169        amod = ir.ambassador_module
  2170        timeout = amod.envoy_validation_timeout if amod else IRAmbassador.default_validation_timeout
  2171
  2172        # If the timeout is zero, don't do the validation.
  2173        if timeout == 0:
  2174            self.logger.debug("not validating Envoy configuration since timeout is 0")
  2175            return True
  2176
  2177        self.logger.debug(f"validating Envoy configuration with timeout {timeout}")
  2178
  2179        for retry in range(retries):
  2180            try:
  2181                v_encoded = subprocess.check_output(
  2182                    command, stderr=subprocess.STDOUT, timeout=timeout
  2183                )
  2184                v_exit = 0
  2185                break
  2186            except subprocess.CalledProcessError as e:
  2187                v_exit = e.returncode
  2188                v_encoded = e.output
  2189                break
  2190            except subprocess.TimeoutExpired as e:
  2191                v_exit = 1
  2192                v_encoded = e.output or "".encode("utf-8")
  2193
  2194                self.logger.warn(
  2195                    "envoy configuration validation timed out after {} seconds{}\n{}".format(
  2196                        timeout,
  2197                        ", retrying..." if retry < retries - 1 else "",
  2198                        v_encoded.decode("utf-8"),
  2199                    )
  2200                )
  2201
  2202                # Don't break here; continue on to the next iteration of the loop.
  2203
  2204        if v_exit == 0:
  2205            self.logger.debug(
  2206                "successfully validated the resulting envoy configuration, continuing..."
  2207            )
  2208            return True
  2209
  2210        v_str = typecast(str, v_encoded)
  2211
  2212        try:
  2213            v_str = v_encoded.decode("utf-8")
  2214        except:
  2215            pass
  2216
  2217        self.logger.error(
  2218            "{}\ncould not validate the envoy configuration above after {} retries, failed with error \n{}\n(exit code {})\nAborting update...".format(
  2219                config_json, retries, v_str, v_exit
  2220            )
  2221        )
  2222        return False
  2223
  2224
  2225class StandaloneApplication(gunicorn.app.base.BaseApplication):
  2226    def __init__(self, app, options=None):
  2227        self.options = options or {}
  2228        self.application = app
  2229        super(StandaloneApplication, self).__init__()
  2230
  2231        # Boot chime. This is basically the earliest point at which we can consider an Ambassador
  2232        # to be "running".
  2233        scout_result = self.application.scout.report(mode="boot", action="boot1", no_cache=True)
  2234        self.application.logger.debug(f"BOOT: Scout result {dump_json(scout_result)}")
  2235        self.application.logger.info(f"Ambassador {__version__} booted")
  2236
  2237    def load_config(self):
  2238        config = dict(
  2239            [
  2240                (key, value)
  2241                for key, value in self.options.items()
  2242                if key in self.cfg.settings and value is not None
  2243            ]
  2244        )
  2245
  2246        for key, value in config.items():
  2247            self.cfg.set(key.lower(), value)
  2248
  2249    def load(self):
  2250        # This is a little weird, but whatever.
  2251        self.application.watcher = AmbassadorEventWatcher(self.application)
  2252        self.application.watcher.start()
  2253
  2254        if self.application.config_path:
  2255            self.application.watcher.post("CONFIG_FS", self.application.config_path)
  2256
  2257        return self.application
  2258
  2259
  2260@click.command()
  2261@click.argument("snapshot-path", type=click.Path(), required=False)
  2262@click.argument("bootstrap-path", type=click.Path(), required=False)
  2263@click.argument("ads-path", type=click.Path(), required=False)
  2264@click.option(
  2265    "--config-path",
  2266    type=click.Path(),
  2267    help="Optional configuration path to scan for Ambassador YAML files",
  2268)
  2269@click.option(
  2270    "--k8s",
  2271    is_flag=True,
  2272    help="If True, assume config_path contains Kubernetes resources (only relevant with config_path)",
  2273)
  2274@click.option(
  2275    "--ambex-pid",
  2276    type=int,
  2277    default=0,
  2278    help="Optional PID to signal with HUP after updating Envoy configuration",
  2279    show_default=True,
  2280)
  2281@click.option("--kick", type=str, help="Optional command to run after updating Envoy configuration")
  2282@click.option(
  2283    "--banner-endpoint",
  2284    type=str,
  2285    default="http://127.0.0.1:8500/banner",
  2286    help="Optional endpoint of extra banner to include",
  2287    show_default=True,
  2288)
  2289@click.option(
  2290    "--metrics-endpoint",
  2291    type=str,
  2292    default="http://127.0.0.1:8500/metrics",
  2293    help="Optional endpoint of extra prometheus metrics to include",
  2294    show_default=True,
  2295)
  2296@click.option("--no-checks", is_flag=True, help="If True, don't do Envoy-cluster health checking")
  2297@click.option("--no-envoy", is_flag=True, help="If True, don't interact with Envoy at all")
  2298@click.option("--reload", is_flag=True, help="If True, run Flask in debug mode for live reloading")
  2299@click.option("--debug", is_flag=True, help="If True, do debug logging")
  2300@click.option(
  2301    "--dev-magic",
  2302    is_flag=True,
  2303    help="If True, override a bunch of things for Datawire dev-loop stuff",
  2304)
  2305@click.option("--verbose", is_flag=True, help="If True, do really verbose debug logging")
  2306@click.option(
  2307    "--workers", type=int, help="Number of workers; default is based on the number of CPUs present"
  2308)
  2309@click.option("--host", type=str, help="Interface on which to listen")
  2310@click.option("--port", type=int, default=-1, help="Port on which to listen", show_default=True)
  2311@click.option("--notices", type=click.Path(), help="Optional file to read for local notices")
  2312@click.option(
  2313    "--validation-retries",
  2314    type=int,
  2315    default=5,
  2316    help="Number of times to retry Envoy configuration validation after a timeout",
  2317    show_default=True,
  2318)
  2319@click.option(
  2320    "--allow-fs-commands",
  2321    is_flag=True,
  2322    help="If true, allow CONFIG_FS to support debug/testing commands",
  2323)
  2324@click.option(
  2325    "--local-scout",
  2326    is_flag=True,
  2327    help="Don't talk to remote Scout at all; keep everything purely local",
  2328)
  2329@click.option("--report-action-keys", is_flag=True, help="Report action keys when chiming")
  2330def main(
  2331    snapshot_path=None,
  2332    bootstrap_path=None,
  2333    ads_path=None,
  2334    *,
  2335    dev_magic=False,
  2336    config_path=None,
  2337    ambex_pid=0,
  2338    kick=None,
  2339    banner_endpoint="http://127.0.0.1:8500/banner",
  2340    metrics_endpoint="http://127.0.0.1:8500/metrics",
  2341    k8s=False,
  2342    no_checks=False,
  2343    no_envoy=False,
  2344    reload=False,
  2345    debug=False,
  2346    verbose=False,
  2347    workers=None,
  2348    port=-1,
  2349    host="",
  2350    notices=None,
  2351    validation_retries=5,
  2352    allow_fs_commands=False,
  2353    local_scout=False,
  2354    report_action_keys=False,
  2355):
  2356    """
  2357    Run the diagnostic daemon.
  2358
  2359    Arguments:
  2360
  2361      SNAPSHOT_PATH
  2362        Path to directory in which to save configuration snapshots and dynamic secrets
  2363
  2364      BOOTSTRAP_PATH
  2365        Path to which to write bootstrap Envoy configuration
  2366
  2367      ADS_PATH
  2368        Path to which to write ADS Envoy configuration
  2369    """
  2370
  2371    enable_fast_reconfigure = parse_bool(os.environ.get("AMBASSADOR_FAST_RECONFIGURE", "true"))
  2372
  2373    if port < 0:
  2374        port = Constants.DIAG_PORT if not enable_fast_reconfigure else Constants.DIAG_PORT_ALT
  2375        # port = Constants.DIAG_PORT
  2376
  2377    if not host:
  2378        host = "0.0.0.0" if not enable_fast_reconfigure else "127.0.0.1"
  2379
  2380    if dev_magic:
  2381        # Override the world.
  2382        os.environ["SCOUT_HOST"] = "127.0.0.1:9999"
  2383        os.environ["SCOUT_HTTPS"] = "no"
  2384
  2385        no_checks = True
  2386        no_envoy = True
  2387
  2388        os.makedirs("/tmp/snapshots", mode=0o755, exist_ok=True)
  2389
  2390        snapshot_path = "/tmp/snapshots"
  2391        bootstrap_path = "/tmp/boot.json"
  2392        ads_path = "/tmp/ads.json"
  2393
  2394        port = 9998
  2395
  2396        allow_fs_commands = True
  2397        local_scout = True
  2398        report_action_keys = True
  2399
  2400    if no_envoy:
  2401        no_checks = True
  2402
  2403    # Create the application itself.
  2404    app.setup(
  2405        snapshot_path,
  2406        bootstrap_path,
  2407        ads_path,
  2408        config_path,
  2409        ambex_pid,
  2410        kick,
  2411        banner_endpoint,
  2412        metrics_endpoint,
  2413        k8s,
  2414        not no_checks,
  2415        no_envoy,
  2416        reload,
  2417        debug,
  2418        verbose,
  2419        notices,
  2420        validation_retries,
  2421        allow_fs_commands,
  2422        local_scout,
  2423        report_action_keys,
  2424        enable_fast_reconfigure,
  2425    )
  2426
  2427    if not workers:
  2428        workers = number_of_workers()
  2429
  2430    gunicorn_config = {
  2431        "bind": "%s:%s" % (host, port),
  2432        # 'workers': 1,
  2433        "threads": workers,
  2434    }
  2435
  2436    app.logger.info(
  2437        "thread count %d, listening on %s" % (gunicorn_config["threads"], gunicorn_config["bind"])
  2438    )
  2439
  2440    StandaloneApplication(app, gunicorn_config).run()
  2441
  2442
  2443if __name__ == "__main__":
  2444    main()

View as plain text