...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador/diagnostics/envoy_stats.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador/diagnostics

     1# Copyright 2018 Datawire. All rights reserved.
     2#
     3# Licensed under the Apache License, Version 2.0 (the "License");
     4# you may not use this file except in compliance with the License.
     5# You may obtain a copy of the License at
     6#
     7#     http://www.apache.org/licenses/LICENSE-2.0
     8#
     9# Unless required by applicable law or agreed to in writing, software
    10# distributed under the License is distributed on an "AS IS" BASIS,
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12# See the License for the specific language governing permissions and
    13# limitations under the License
    14
    15import logging
    16import threading
    17import time
    18from dataclasses import dataclass
    19from dataclasses import field as dc_field
    20from typing import Any, Callable, Dict, List, Optional, Union
    21
    22import requests
    23
    24
    25def percentage(x: float, y: float) -> int:
    26    if y == 0:
    27        return 0
    28    else:
    29        return int(((x * 100) / y) + 0.5)
    30
    31
    32@dataclass(frozen=True)
    33class EnvoyStats:
    34    max_live_age: int = 120
    35    max_ready_age: int = 120
    36    created: float = 0.0
    37    last_update: Optional[float] = None
    38    last_attempt: Optional[float] = None
    39    update_errors: int = 0
    40
    41    # Yes yes yes I know -- the contents of these dicts are not immutable.
    42    # That's OK for now, but realize that you mustn't go munging around altering
    43    # things in here once they're assigned!
    44    requests: Dict[str, Any] = dc_field(default_factory=dict)
    45    clusters: Dict[str, Any] = dc_field(default_factory=dict)
    46    envoy: Dict[str, Any] = dc_field(default_factory=dict)
    47
    48    def is_alive(self) -> bool:
    49        """
    50        Make sure we've heard from Envoy within max_live_age seconds.
    51
    52        If we haven't yet heard from Envoy at all (we've just booted),
    53        consider Envoy alive if we haven't yet been running for max_live_age
    54        seconds -- basically, Envoy gets a grace period to start running at
    55        boot time.
    56        """
    57
    58        epoch = self.last_update
    59
    60        if not epoch:
    61            epoch = self.created
    62
    63        return (time.time() - epoch) <= self.max_live_age
    64
    65    def is_ready(self) -> bool:
    66        """
    67        Make sure we've heard from Envoy within max_ready_age seconds.
    68
    69        If we haven't yet heard from Envoy at all (we've just booted),
    70        then Envoy is not yet ready, and is_ready() returns False.
    71        """
    72
    73        epoch = self.last_update
    74
    75        if not epoch:
    76            return False
    77
    78        return (time.time() - epoch) <= self.max_ready_age
    79
    80    def time_since_boot(self) -> float:
    81        """Return the number of seconds since Envoy booted."""
    82        return time.time() - self.created
    83
    84    def time_since_update(self) -> Optional[float]:
    85        """
    86        Return the number of seconds since we last heard from Envoy, or None if
    87        we've never heard from Envoy.
    88        """
    89
    90        if not self.last_update:
    91            return None
    92        else:
    93            return time.time() - self.last_update
    94
    95    def cluster_stats(self, name: str) -> Dict[str, Union[str, bool]]:
    96        if not self.last_update:
    97            # No updates.
    98            return {
    99                "valid": False,
   100                "reason": "No stats updates have succeeded",
   101                "health": "no stats yet",
   102                "hmetric": "startup",
   103                "hcolor": "grey",
   104            }
   105
   106        # OK, we should be OK.
   107        when = self.last_update
   108        cstat = self.clusters
   109
   110        if name not in cstat:
   111            return {
   112                "valid": False,
   113                "reason": "Cluster %s is not defined" % name,
   114                "health": "undefined cluster",
   115                "hmetric": "undefined cluster",
   116                "hcolor": "orange",
   117            }
   118
   119        cstat = dict(**cstat[name])
   120        cstat.update({"valid": True, "reason": "Cluster %s updated at %d" % (name, when)})
   121
   122        pct = cstat.get("healthy_percent", None)
   123
   124        if pct != None:
   125            color = "green"
   126
   127            if pct < 70:
   128                color = "red"
   129            elif pct < 90:
   130                color = "yellow"
   131
   132            cstat.update({"health": "%d%% healthy" % pct, "hmetric": str(pct), "hcolor": color})
   133        else:
   134            cstat.update(
   135                {
   136                    "health": "Unknown health: no requests yet",
   137                    "hmetric": "waiting",
   138                    "hcolor": "grey",
   139                }
   140            )
   141
   142        return cstat
   143
   144
   145LogLevelFetcher = Callable[[Optional[str]], Optional[str]]
   146EnvoyStatsFetcher = Callable[[], Optional[str]]
   147
   148
   149class EnvoyStatsMgr:
   150    # fetch_log_levels and fetch_envoy_stats are debugging hooks
   151    def __init__(
   152        self,
   153        logger: logging.Logger,
   154        max_live_age: int = 120,
   155        max_ready_age: int = 120,
   156        fetch_log_levels: Optional[LogLevelFetcher] = None,
   157        fetch_envoy_stats: Optional[EnvoyStatsFetcher] = None,
   158    ) -> None:
   159        self.logger = logger
   160        self.loginfo: Dict[str, Union[str, List[str]]] = {}
   161
   162        self.update_lock = threading.Lock()
   163        self.access_lock = threading.Lock()
   164
   165        self.fetch_log_levels = fetch_log_levels or self._fetch_log_levels
   166        self.fetch_envoy_stats = fetch_envoy_stats or self._fetch_envoy_stats
   167
   168        self.stats = EnvoyStats(
   169            created=time.time(), max_live_age=max_live_age, max_ready_age=max_ready_age
   170        )
   171
   172    def _fetch_log_levels(self, level: Optional[str]) -> Optional[str]:
   173        try:
   174            url = "http://127.0.0.1:8001/logging"
   175
   176            if level:
   177                url += "?level=%s" % level
   178
   179            r = requests.post(url)
   180
   181            # OMFG. Querying log levels returns with a 404 code.
   182            if (r.status_code != 200) and (r.status_code != 404):
   183                self.logger.warning("EnvoyStats.update_log_levels failed: %s" % r.text)
   184                return None
   185
   186            return r.text
   187        except Exception as e:
   188            self.logger.warning("EnvoyStats.update_log_levels failed: %s" % e)
   189            return None
   190
   191    def _fetch_envoy_stats(self) -> Optional[str]:
   192        try:
   193            r = requests.get("http://127.0.0.1:8001/stats")
   194
   195            if r.status_code != 200:
   196                self.logger.warning("EnvoyStats.update failed: %s" % r.text)
   197                return None
   198
   199            return r.text
   200        except OSError as e:
   201            self.logger.warning("EnvoyStats.update failed: %s" % e)
   202            return None
   203
   204    def update_log_levels(self, last_attempt: float, level: Optional[str] = None) -> bool:
   205        """
   206        Heavy lifting around updating the Envoy log levels.
   207
   208        You MUST hold the update lock when calling this method.
   209        You MUST NOT hold the access lock when calling this method.
   210
   211        update_log_levels does all the work of talking to Envoy and computing
   212        new stats, then grabs the access_lock just long enough to update the data
   213        structures for others to look at.
   214        """
   215
   216        text = self.fetch_log_levels(level)
   217
   218        if not text:
   219            # Ew.
   220            with self.access_lock:
   221                # EnvoyStats is immutable, so...
   222                new_stats = EnvoyStats(
   223                    max_live_age=self.stats.max_live_age,
   224                    max_ready_age=self.stats.max_ready_age,
   225                    created=self.stats.created,
   226                    last_update=self.stats.last_update,
   227                    last_attempt=last_attempt,  # THIS IS A CHANGE
   228                    update_errors=self.stats.update_errors + 1,  # THIS IS A CHANGE
   229                    requests=self.stats.requests,
   230                    clusters=self.stats.clusters,
   231                    envoy=self.stats.envoy,
   232                )
   233
   234                self.stats = new_stats
   235
   236                return False
   237
   238        levels: Dict[str, Dict[str, bool]] = {}
   239
   240        for line in text.split("\n"):
   241            if not line:
   242                continue
   243
   244            if line.startswith("  "):
   245                (logtype, level) = line[2:].split(": ")
   246
   247                x = levels.setdefault(level, {})
   248                x[logtype] = True
   249
   250        loginfo: Dict[str, Union[str, List[str]]]
   251
   252        if len(levels.keys()) == 1:
   253            loginfo = {"all": list(levels.keys())[0]}
   254        else:
   255            loginfo = {x: list(levels[x].keys()) for x in levels.keys()}
   256
   257        with self.access_lock:
   258            self.loginfo = loginfo
   259            return True
   260
   261    def get_stats(self) -> EnvoyStats:
   262        """
   263        Get the current Envoy stats object, safely.
   264
   265        You MUST NOT hold the access_lock when calling this method.
   266        """
   267
   268        with self.access_lock:
   269            return self.stats
   270
   271    def get_prometheus_stats(self) -> str:
   272        try:
   273            r = requests.get("http://127.0.0.1:8001/stats/prometheus")
   274        except OSError as e:
   275            self.logger.warning("EnvoyStats.get_prometheus_state failed: %s" % e)
   276            return ""
   277
   278        if r.status_code != 200:
   279            self.logger.warning("EnvoyStats.get_prometheus_state failed: %s" % r.text)
   280            return ""
   281        return r.text
   282
   283    def update_envoy_stats(self, last_attempt: float) -> None:
   284        """
   285        Heavy lifting around updating the Envoy stats.
   286
   287        You MUST hold the update lock when calling this method.
   288        You MUST NOT hold the access lock when calling this method.
   289
   290        update_envoy_stats does all the work of talking to Envoy and computing
   291        new stats, then grabs the access_lock just long enough to update the data
   292        structures for others to look at.
   293        """
   294
   295        text = self.fetch_envoy_stats()
   296
   297        if not text:
   298            # EnvoyStats is immutable, so...
   299            new_stats = EnvoyStats(
   300                max_live_age=self.stats.max_live_age,
   301                max_ready_age=self.stats.max_ready_age,
   302                created=self.stats.created,
   303                last_update=self.stats.last_update,
   304                last_attempt=last_attempt,  # THIS IS A CHANGE
   305                update_errors=self.stats.update_errors + 1,  # THIS IS A CHANGE
   306                requests=self.stats.requests,
   307                clusters=self.stats.clusters,
   308                envoy=self.stats.envoy,
   309            )
   310
   311            with self.access_lock:
   312                self.stats = new_stats
   313                return
   314
   315        # Parse stats into a hierarchy.
   316        envoy_stats: Dict[str, Any] = {}  # Ew.
   317
   318        for line in text.split("\n"):
   319            if not line:
   320                continue
   321
   322            # TODO: Splitting from the right is a work-around for the
   323            # following issue: https://github.com/emissary-ingress/emissary/issues/4528
   324            # and needs to be addressed via a behavior change
   325            key, value = line.rsplit(":", 1)
   326            keypath = key.split(".")
   327
   328            node = envoy_stats
   329
   330            for key in keypath[:-1]:
   331                if key not in node:
   332                    node[key] = {}
   333
   334                node = node[key]
   335
   336            value = value.strip()
   337
   338            try:
   339                node[keypath[-1]] = int(value)
   340            except:
   341                continue
   342
   343        # Now dig into clusters a bit more.
   344
   345        requests_info = {}
   346        active_clusters = {}
   347
   348        if ("http" in envoy_stats) and ("ingress_http" in envoy_stats["http"]):
   349            ingress_stats = envoy_stats["http"]["ingress_http"]
   350
   351            requests_total = ingress_stats.get("downstream_rq_total", 0)
   352
   353            requests_4xx = ingress_stats.get("downstream_rq_4xx", 0)
   354            requests_5xx = ingress_stats.get("downstream_rq_5xx", 0)
   355            requests_bad = requests_4xx + requests_5xx
   356
   357            requests_ok = requests_total - requests_bad
   358
   359            requests_info = {
   360                "total": requests_total,
   361                "4xx": requests_4xx,
   362                "5xx": requests_5xx,
   363                "bad": requests_bad,
   364                "ok": requests_ok,
   365            }
   366
   367        if "cluster" in envoy_stats:
   368            for cluster_name in envoy_stats["cluster"]:
   369                cluster = envoy_stats["cluster"][cluster_name]
   370
   371                healthy_percent: Optional[int]
   372
   373                healthy_members = cluster["membership_healthy"]
   374                total_members = cluster["membership_total"]
   375                healthy_percent = percentage(healthy_members, total_members)
   376
   377                update_attempts = cluster["update_attempt"]
   378                update_successes = cluster["update_success"]
   379                update_percent = percentage(update_successes, update_attempts)
   380
   381                upstream_total = cluster.get("upstream_rq_completed", 0)
   382
   383                upstream_4xx = cluster.get("upstream_rq_4xx", 0)
   384                upstream_5xx = cluster.get("upstream_rq_5xx", 0)
   385                upstream_bad = upstream_5xx  # used to include 4XX here, but that seems wrong.
   386
   387                upstream_ok = upstream_total - upstream_bad
   388
   389                if upstream_total > 0:
   390                    healthy_percent = percentage(upstream_ok, upstream_total)
   391                else:
   392                    healthy_percent = None
   393
   394                active_clusters[cluster_name] = {
   395                    "healthy_members": healthy_members,
   396                    "total_members": total_members,
   397                    "healthy_percent": healthy_percent,
   398                    "update_attempts": update_attempts,
   399                    "update_successes": update_successes,
   400                    "update_percent": update_percent,
   401                    "upstream_ok": upstream_ok,
   402                    "upstream_4xx": upstream_4xx,
   403                    "upstream_5xx": upstream_5xx,
   404                    "upstream_bad": upstream_bad,
   405                }
   406
   407        # OK, we're now officially finished with all the hard stuff.
   408        last_update = time.time()
   409
   410        # Finally, set up the new EnvoyStats.
   411        new_stats = EnvoyStats(
   412            max_live_age=self.stats.max_live_age,
   413            max_ready_age=self.stats.max_ready_age,
   414            created=self.stats.created,
   415            last_update=last_update,  # THIS IS A CHANGE
   416            last_attempt=last_attempt,  # THIS IS A CHANGE
   417            update_errors=self.stats.update_errors,
   418            requests=requests_info,  # THIS IS A CHANGE
   419            clusters=active_clusters,  # THIS IS A CHANGE
   420            envoy=envoy_stats,  # THIS IS A CHANGE
   421        )
   422
   423        # Make sure we hold the access_lock while messing with self.stats!
   424        with self.access_lock:
   425            self.stats = new_stats
   426
   427    def update(self) -> None:
   428        """
   429        Update the Envoy stats object, including our take on Envoy's loglevel and
   430        lower-level statistics.
   431
   432        You MUST NOT hold the update lock when calling this method.
   433        You MUST NOT hold the access lock when calling this method.
   434
   435        The first thing that update_envoy_stats does is to acquire the update_lock.
   436        If it cannot do so immediately, it assumes that another update is already
   437        running, and returns without doing anything further.
   438
   439        update_envoy_stats uses update_log_levels and update_envoy_stats to do all
   440        the heavy lifting around talking to Envoy, managing the access_lock, and
   441        actually writing new data into the Envoy stats object.
   442        """
   443
   444        # First up, try bailing early.
   445        if not self.update_lock.acquire(blocking=False):
   446            self.logger.warning("EStats update: skipping due to lock contention")
   447            return
   448
   449        # If here, we have the lock. Make sure it gets released!
   450        try:
   451            # Remember when we started.
   452            last_attempt = time.time()
   453
   454            self.update_log_levels(last_attempt)
   455            self.update_envoy_stats(last_attempt)
   456        except Exception as e:
   457            self.logger.exception("could not update Envoy stats: %s" % e)
   458        finally:
   459            self.update_lock.release()

View as plain text