...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador/ambscout.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador

     1import datetime
     2import logging
     3import os
     4import re
     5from typing import Any, ClassVar, Dict, List, Optional, Union
     6from typing import cast as typecast
     7
     8import semantic_version
     9
    10from .scout import Scout
    11from .utils import dump_json, parse_bool, parse_json
    12from .VERSION import Commit, Version
    13
    14ScoutNotice = Dict[str, str]
    15
    16
    17class LocalScout:
    18    def __init__(self, logger, app: str, version: str, install_id: str) -> None:
    19        self.logger = logger
    20        self.app = app
    21        self.version = version
    22        self.install_id = install_id
    23
    24        self.events: List[Dict[str, Any]] = []
    25
    26        self.logger.info(f"LocalScout: initialized for {app} {version}: ID {install_id}")
    27
    28    def report(self, **kwargs) -> dict:
    29        self.events.append(kwargs)
    30
    31        mode = kwargs["mode"]
    32        action = kwargs["action"]
    33
    34        now = datetime.datetime.now().timestamp()
    35
    36        kwargs["local_scout_timestamp"] = now
    37
    38        if "timestamp" not in kwargs:
    39            kwargs["timestamp"] = now
    40
    41        self.logger.info(f"LocalScout: mode {mode}, action {action} ({kwargs})")
    42
    43        return {
    44            "latest_version": self.version,
    45            "application": self.app,
    46            "cached": False,
    47            "notices": [{"level": "WARNING", "message": "Using LocalScout, result is faked!"}],
    48            "timestamp": now,
    49        }
    50
    51    def reset_events(self) -> None:
    52        self.events = []
    53
    54
    55class AmbScout:
    56    reTaggedBranch: ClassVar = re.compile(r"^v?(\d+\.\d+\.\d+)(-[a-zA-Z][a-zA-Z]\.\d+)?$")
    57    reGitDescription: ClassVar = re.compile(r"-(\d+)-g([0-9a-f]+)$")
    58
    59    install_id: str
    60    runtime: str
    61    namespace: str
    62
    63    version: str
    64    semver: Optional[semantic_version.Version]
    65
    66    _scout: Optional[Union[Scout, LocalScout]]
    67    _scout_error: Optional[str]
    68
    69    _notices: Optional[List[ScoutNotice]]
    70    _last_result: Optional[dict]
    71    _last_update: Optional[datetime.datetime]
    72    _update_frequency: datetime.timedelta
    73
    74    _latest_version: Optional[str] = None
    75    _latest_semver: Optional[semantic_version.Version] = None
    76
    77    def __init__(
    78        self, install_id=None, update_frequency=datetime.timedelta(hours=12), local_only=False
    79    ) -> None:
    80        if not install_id:
    81            install_id = os.environ.get(
    82                "AMBASSADOR_CLUSTER_ID",
    83                os.environ.get("AMBASSADOR_SCOUT_ID", "00000000-0000-0000-0000-000000000000"),
    84            )
    85
    86        self.install_id = install_id
    87        self.runtime = "kubernetes" if os.environ.get("KUBERNETES_SERVICE_HOST", None) else "docker"
    88        self.namespace = os.environ.get("AMBASSADOR_NAMESPACE", "default")
    89
    90        # Allow an environment variable to state whether we're in Edge Stack. But keep the
    91        # existing condition as sufficient, so that there is less of a chance of breaking
    92        # things running in a container with this file present.
    93        self.is_edge_stack = parse_bool(os.environ.get("EDGE_STACK", "false")) or os.path.exists(
    94            "/ambassador/.edge_stack"
    95        )
    96        self.app = "aes" if self.is_edge_stack else "ambassador"
    97        self.version = Version
    98        self.semver = self.get_semver(self.version)
    99
   100        self.logger = logging.getLogger("ambassador.scout")
   101        # self.logger.setLevel(logging.DEBUG)
   102
   103        self.logger.debug("Ambassador version %s built from %s" % (Version, Commit))
   104        self.logger.debug(
   105            "Scout version      %s%s" % (self.version, " - BAD SEMVER" if not self.semver else "")
   106        )
   107        self.logger.debug("Runtime            %s" % self.runtime)
   108        self.logger.debug("Namespace          %s" % self.namespace)
   109
   110        self._scout = None
   111        self._scout_error = None
   112
   113        self._local_only = local_only
   114
   115        self._notices = None
   116        self._last_result = None
   117        self._update_frequency = update_frequency
   118        self._latest_version = None
   119        self._latest_semver = None
   120
   121        self.reset_cache_time()
   122
   123    def reset_cache_time(self) -> None:
   124        self._last_update = datetime.datetime.now() - datetime.timedelta(hours=24)
   125
   126    def reset_events(self) -> None:
   127        if self._local_only:
   128            assert self._scout
   129            typecast(LocalScout, self._scout).reset_events()
   130
   131    def __str__(self) -> str:
   132        return "%s: %s" % (
   133            "OK" if self._scout else "??",
   134            self._scout_error if self._scout_error else "OK",
   135        )
   136
   137    @property
   138    def scout(self) -> Optional[Union[Scout, LocalScout]]:
   139        if not self._scout:
   140            if self._local_only:
   141                self._scout = LocalScout(
   142                    logger=self.logger,
   143                    app=self.app,
   144                    version=self.version,
   145                    install_id=self.install_id,
   146                )
   147                self.logger.debug("LocalScout initialized")
   148            else:
   149                try:
   150                    self._scout = Scout(
   151                        app=self.app, version=self.version, install_id=self.install_id
   152                    )
   153                    self._scout_error = None
   154                    self.logger.debug("Scout connection established")
   155                except OSError as e:
   156                    self._scout = None
   157                    self._scout_error = str(e)
   158                    self.logger.debug(
   159                        "Scout connection failed, will retry later: %s" % self._scout_error
   160                    )
   161
   162        return self._scout
   163
   164    def report(self, force_result: Optional[dict] = None, no_cache=False, **kwargs) -> dict:
   165        _notices: List[ScoutNotice] = []
   166
   167        # Silly, right?
   168        use_cache = not no_cache
   169
   170        env_result = None
   171
   172        if use_cache:
   173            env_result = os.environ.get("AMBASSADOR_SCOUT_RESULT", None)
   174
   175            if env_result:
   176                force_result = parse_json(env_result)
   177
   178        result: Optional[dict] = force_result
   179        result_was_cached: bool = False
   180
   181        if not result:
   182            if "runtime" not in kwargs:
   183                kwargs["runtime"] = self.runtime
   184
   185            if "commit" not in kwargs:
   186                kwargs["commit"] = Commit
   187
   188            # How long since the last Scout update? If it's been more than an hour,
   189            # check Scout again.
   190
   191            now = datetime.datetime.now()
   192
   193            needs_update = True
   194
   195            if use_cache:
   196                if self._last_update:
   197                    since_last_update = now - typecast(datetime.datetime, self._last_update)
   198                    needs_update = since_last_update > self._update_frequency
   199
   200            if needs_update:
   201                if self.scout:
   202                    result = self.scout.report(**kwargs)
   203
   204                    self._last_update = now
   205                    self._last_result = dict(**typecast(dict, result)) if result else None
   206                else:
   207                    result = {"scout": "unavailable: %s" % self._scout_error}
   208                    _notices.append(
   209                        {
   210                            "level": "DEBUG",
   211                            "message": "scout temporarily unavailable: %s" % self._scout_error,
   212                        }
   213                    )
   214
   215                # Whether we could talk to Scout or not, update the timestamp so we don't
   216                # try again too soon.
   217                result_timestamp = datetime.datetime.now()
   218            else:
   219                _notices.append({"level": "DEBUG", "message": "Returning cached result"})
   220                result = dict(**typecast(dict, self._last_result)) if self._last_result else None
   221                result_was_cached = True
   222
   223                # We can't get here unless self._last_update is set.
   224                result_timestamp = typecast(datetime.datetime, self._last_update)
   225        else:
   226            _notices.append({"level": "INFO", "message": "Returning forced Scout result"})
   227            result_timestamp = datetime.datetime.now()
   228
   229        if not self.semver:
   230            _notices.append(
   231                {
   232                    "level": "WARNING",
   233                    "message": "Ambassador has invalid version '%s'??!" % self.version,
   234                }
   235            )
   236
   237        if result:
   238            result["cached"] = result_was_cached
   239        else:
   240            result = {"cached": False}
   241
   242        result["timestamp"] = result_timestamp.timestamp()
   243
   244        # Do version & notices stuff.
   245        if "latest_version" in result:
   246            latest_version = result["latest_version"]
   247            latest_semver = self.get_semver(latest_version)
   248
   249            if latest_semver:
   250                self._latest_version = latest_version
   251                self._latest_semver = latest_semver
   252            else:
   253                _notices.append(
   254                    {
   255                        "level": "WARNING",
   256                        "message": "Scout returned invalid version '%s'??!" % latest_version,
   257                    }
   258                )
   259
   260        if self._latest_semver and ((not self.semver) or (self._latest_semver > self.semver)):
   261            _notices.append(
   262                {
   263                    "level": "INFO",
   264                    "message": "Upgrade available! to Ambassador version %s" % self._latest_semver,
   265                }
   266            )
   267
   268        if "notices" in result:
   269            rnotices = typecast(List[Union[str, ScoutNotice]], result["notices"])
   270
   271            for notice in rnotices:
   272                if isinstance(notice, str):
   273                    _notices.append({"level": "WARNING", "message": notice})
   274                elif isinstance(notice, dict):
   275                    lvl = notice.get("level", "WARNING").upper()
   276                    msg = notice.get("message", None)
   277
   278                    if msg:
   279                        _notices.append({"level": lvl, "message": msg})
   280                else:
   281                    _notices.append({"level": "WARNING", "message": dump_json(notice)})
   282
   283        self._notices = _notices
   284
   285        if self._notices:
   286            result["notices"] = self._notices
   287        else:
   288            result.pop("notices", None)
   289
   290        return result
   291
   292    @staticmethod
   293    def get_semver(version_string: str) -> Optional[semantic_version.Version]:
   294        semver = None
   295
   296        try:
   297            semver = semantic_version.Version(version_string)
   298        except ValueError:
   299            pass
   300
   301        return semver

View as plain text