1#!python
2
3# Copyright 2018 Datawire. All rights reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License
16import concurrent.futures
17import copy
18import datetime
19import difflib
20import functools
21import json
22import logging
23import multiprocessing
24import os
25import queue
26import re
27import signal
28import subprocess
29import sys
30import threading
31import time
32import traceback
33import uuid
34from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union
35from typing import cast as typecast
36
37import click
38import gunicorn.app.base
39import jsonpatch
40import requests
41from expiringdict import ExpiringDict
42from flask import Flask, Response
43from flask import json as flask_json
44from flask import jsonify, render_template, request, send_from_directory
45from pkg_resources import Requirement, resource_filename
46from prometheus_client import CollectorRegistry, Gauge, Info, ProcessCollector, generate_latest
47from pythonjsonlogger import jsonlogger
48from typing_extensions import NotRequired, TypedDict
49
50from ambassador import IR, Cache, Config, Diagnostics, EnvoyConfig, Scout, Version
51from ambassador.ambscout import LocalScout
52from ambassador.constants import Constants
53from ambassador.diagnostics import EnvoyStats, EnvoyStatsMgr
54from ambassador.fetch import ResourceFetcher
55from ambassador.ir.irambassador import IRAmbassador
56from ambassador.reconfig_stats import ReconfigStats
57from ambassador.utils import (
58 FSSecretHandler,
59 PeriodicTrigger,
60 SecretHandler,
61 SystemInfo,
62 Timer,
63 dump_json,
64 load_url_contents,
65 parse_bool,
66 parse_json,
67)
68
69if TYPE_CHECKING:
70 from ambassador.ir.irtlscontext import IRTLSContext # pragma: no cover
71
72__version__ = Version
73
74boot_time = datetime.datetime.now()
75
76# allows 10 concurrent users, with a request timeout of 60 seconds
77tvars_cache = ExpiringDict(max_len=10, max_age_seconds=60)
78
79logHandler = None
80if parse_bool(os.environ.get("AMBASSADOR_JSON_LOGGING", "false")):
81 jsonFormatter = jsonlogger.JsonFormatter(
82 "%%(asctime)s %%(filename)s %%(lineno)d %%(process)d (threadName)s %%(levelname)s %%(message)s"
83 )
84 logHandler = logging.StreamHandler()
85 logHandler.setFormatter(jsonFormatter)
86
87 # Set the root logger to INFO level and tell it to use the new log handler.
88 logger = logging.getLogger()
89 logger.setLevel(logging.INFO)
90 logger.addHandler(logHandler)
91
92 # Update all of the other loggers to also use the new log handler.
93 loggingManager = getattr(logging.root, "manager", None)
94 if loggingManager is not None:
95 for name in loggingManager.loggerDict:
96 logging.getLogger(name).addHandler(logHandler)
97 else:
98 print("Could not find a logging manager. Some logging may not be properly JSON formatted!")
99else:
100 # Default log level
101 level = logging.INFO
102
103 # Check for env var log level
104 if level_name := os.getenv("AES_LOG_LEVEL"):
105 level_number = logging.getLevelName(level_name.upper())
106
107 if isinstance(level_number, int):
108 level = level_number
109
110 # Set defauts for all loggers
111 logging.basicConfig(
112 level=level,
113 format="%%(asctime)s diagd %s [P%%(process)dT%%(threadName)s] %%(levelname)s: %%(message)s"
114 % __version__,
115 datefmt="%Y-%m-%d %H:%M:%S",
116 )
117
118# Shut up Werkzeug's standard request logs -- they're just too noisy.
119logging.getLogger("werkzeug").setLevel(logging.CRITICAL)
120
121# Likewise make requests a bit quieter.
122logging.getLogger("urllib3").setLevel(logging.WARNING)
123logging.getLogger("requests").setLevel(logging.WARNING)
124
125ambassador_targets = {
126 "mapping": "https://www.getambassador.io/reference/configuration#mappings",
127 "module": "https://www.getambassador.io/reference/configuration#modules",
128}
129
130# envoy_targets = {
131# 'route': 'https://envoyproxy.github.io/envoy/configuration/http_conn_man/route_config/route.html',
132# 'cluster': 'https://envoyproxy.github.io/envoy/configuration/cluster_manager/cluster.html',
133# }
134
135
136def number_of_workers():
137 return (multiprocessing.cpu_count() * 2) + 1
138
139
140class DiagApp(Flask):
141 cache: Optional[Cache]
142 ambex_pid: int
143 kick: Optional[str]
144 estatsmgr: EnvoyStatsMgr
145 config_path: Optional[str]
146 snapshot_path: str
147 bootstrap_path: str
148 ads_path: str
149 clustermap_path: str
150 health_checks: bool
151 no_envoy: bool
152 debugging: bool
153 allow_fs_commands: bool
154 report_action_keys: bool
155 verbose: bool
156 notice_path: str
157 logger: logging.Logger # type: ignore # FIXME(lukeshu): Mypy's probably right on this one, but it'd be a lot of work to fix
158 aconf: Config
159 ir: Optional[IR]
160 econf: Optional[EnvoyConfig]
161 # self.diag is actually a property
162 _diag: Optional[Diagnostics]
163 notices: "Notices"
164 scout: Scout
165 watcher: "AmbassadorEventWatcher"
166 stats_updater: Optional[PeriodicTrigger]
167 scout_checker: Optional[PeriodicTrigger]
168 timer_logger: Optional[PeriodicTrigger]
169 last_request_info: Dict[str, int]
170 last_request_time: Optional[datetime.datetime]
171 latest_snapshot: str
172 banner_endpoint: Optional[str]
173 metrics_endpoint: Optional[str]
174
175 # Reconfiguration stats
176 reconf_stats: ReconfigStats
177
178 # Custom metrics registry to weed-out default metrics collectors because the
179 # default collectors can't be prefixed/namespaced with ambassador_.
180 # Using the default metrics collectors would lead to name clashes between the Python and Go instrumentations.
181 metrics_registry: CollectorRegistry
182
183 config_lock: threading.Lock
184 diag_lock: threading.Lock
185
186 def setup(
187 self,
188 snapshot_path: str,
189 bootstrap_path: str,
190 ads_path: str,
191 config_path: Optional[str],
192 ambex_pid: int,
193 kick: Optional[str],
194 banner_endpoint: Optional[str],
195 metrics_endpoint: Optional[str],
196 k8s=False,
197 do_checks=True,
198 no_envoy=False,
199 reload=False,
200 debug=False,
201 verbose=False,
202 notices=None,
203 validation_retries=5,
204 allow_fs_commands=False,
205 local_scout=False,
206 report_action_keys=False,
207 enable_fast_reconfigure=False,
208 clustermap_path=None,
209 ):
210 self.health_checks = do_checks
211 self.no_envoy = no_envoy
212 self.debugging = reload
213 self.verbose = verbose
214 self.notice_path = notices
215 self.notices = Notices(self.notice_path)
216 self.notices.reset()
217 self.k8s = k8s
218 self.validation_retries = validation_retries
219 self.allow_fs_commands = allow_fs_commands
220 self.local_scout = local_scout
221 self.report_action_keys = report_action_keys
222 self.banner_endpoint = banner_endpoint
223 self.metrics_endpoint = metrics_endpoint
224 self.metrics_registry = CollectorRegistry(auto_describe=True)
225 self.enable_fast_reconfigure = enable_fast_reconfigure
226
227 # Init logger, inherits settings from default
228 self.logger = logging.getLogger("ambassador.diagd")
229
230 # Initialize the Envoy stats manager...
231 self.estatsmgr = EnvoyStatsMgr(self.logger)
232
233 # ...and the incremental-reconfigure stats.
234 self.reconf_stats = ReconfigStats(self.logger)
235
236 # This will raise an exception and crash if you pass it a string. That's intentional.
237 self.ambex_pid = int(ambex_pid)
238 self.kick = kick
239
240 # Initialize the cache if we're allowed to.
241 if self.enable_fast_reconfigure:
242 self.logger.info("AMBASSADOR_FAST_RECONFIGURE enabled, initializing cache")
243 self.cache = Cache(self.logger)
244 else:
245 self.logger.info("AMBASSADOR_FAST_RECONFIGURE disabled, not initializing cache")
246 self.cache = None
247
248 # Use Timers to keep some stats on reconfigurations
249 self.config_timer = Timer("reconfiguration", self.metrics_registry)
250 self.fetcher_timer = Timer("Fetcher", self.metrics_registry)
251 self.aconf_timer = Timer("AConf", self.metrics_registry)
252 self.ir_timer = Timer("IR", self.metrics_registry)
253 self.econf_timer = Timer("EConf", self.metrics_registry)
254 self.diag_timer = Timer("Diagnostics", self.metrics_registry)
255
256 # Use gauges to keep some metrics on active config
257 self.diag_errors = Gauge(
258 f"diagnostics_errors",
259 f"Number of configuration errors",
260 namespace="ambassador",
261 registry=self.metrics_registry,
262 )
263 self.diag_notices = Gauge(
264 f"diagnostics_notices",
265 f"Number of configuration notices",
266 namespace="ambassador",
267 registry=self.metrics_registry,
268 )
269 self.diag_log_level = Gauge(
270 f"log_level",
271 f"Debug log level enabled or not",
272 ["level"],
273 namespace="ambassador",
274 registry=self.metrics_registry,
275 )
276
277 if debug:
278 self.logger.setLevel(logging.DEBUG)
279 self.diag_log_level.labels("debug").set(1)
280 logging.getLogger("ambassador").setLevel(logging.DEBUG)
281 else:
282 self.diag_log_level.labels("debug").set(0)
283
284 # Assume that we will NOT update Mapping status.
285 ksclass: Type[KubeStatus] = KubeStatusNoMappings
286
287 if os.environ.get("AMBASSADOR_UPDATE_MAPPING_STATUS", "false").lower() == "true":
288 self.logger.info("WILL update Mapping status")
289 ksclass = KubeStatus
290 else:
291 self.logger.info("WILL NOT update Mapping status")
292
293 self.kubestatus = ksclass(self)
294
295 self.config_path = config_path
296 self.bootstrap_path = bootstrap_path
297 self.ads_path = ads_path
298 self.snapshot_path = snapshot_path
299 self.clustermap_path = clustermap_path or os.path.join(
300 os.path.dirname(self.bootstrap_path), "clustermap.json"
301 )
302
303 # You must hold config_lock when updating config elements (including diag!).
304 self.config_lock = threading.Lock()
305
306 # When generating new diagnostics, there's a dance around config_lock and
307 # diag_lock -- see the diag() property.
308 self.diag_lock = threading.Lock()
309
310 # Why are we doing this? Aren't we sure we're singlethreaded here?
311 # Well, yes. But self.diag is actually a property, and it will raise an
312 # assertion failure if we're not holding self.config_lock... and once
313 # the lock is in play at all, we're gonna time it, in case my belief
314 # that grabbing the lock here is always effectively free turns out to
315 # be wrong.
316
317 with self.config_lock:
318 self.ir = None # don't update unless you hold config_lock
319 self.econf = None # don't update unless you hold config_lock
320 self.diag = None # don't update unless you hold config_lock
321
322 self.stats_updater = None
323 self.scout_checker = None
324
325 self.last_request_info = {}
326 self.last_request_time = None
327
328 # self.scout = Scout(update_frequency=datetime.timedelta(seconds=10))
329 self.scout = Scout(local_only=self.local_scout)
330
331 ProcessCollector(namespace="ambassador", registry=self.metrics_registry)
332 metrics_info = Info(
333 name="diagnostics",
334 namespace="ambassador",
335 documentation="Ambassador diagnostic info",
336 registry=self.metrics_registry,
337 )
338 metrics_info.info(
339 {
340 "version": __version__,
341 "ambassador_id": Config.ambassador_id,
342 "cluster_id": os.environ.get(
343 "AMBASSADOR_CLUSTER_ID",
344 os.environ.get("AMBASSADOR_SCOUT_ID", "00000000-0000-0000-0000-000000000000"),
345 ),
346 "single_namespace": str(Config.single_namespace),
347 }
348 )
349
350 @property
351 def diag(self) -> Optional[Diagnostics]:
352 """
353 It turns out to be expensive to generate the Diagnostics class, so
354 app.diag is a property that does it on demand, handling Timers and
355 the config lock for you.
356
357 You MUST NOT already hold the config_lock or the diag_lock when
358 trying to read app.diag.
359
360 You MUST already have loaded an IR.
361 """
362
363 # The config_lock is meant to make sure that we don't ever update
364 # self.diag in two places at once, so grab that first.
365 with self.config_lock:
366 # If we've already generated diagnostics...
367 if app._diag:
368 # ...then we're good to go.
369 return app._diag
370
371 # If here, we have _not_ generated diagnostics, and we've dropped the
372 # config lock so as not to block anyone else. Next up: grab the diag
373 # lock, because we'd rather not have two diag generations happening at
374 # once.
375 with self.diag_lock:
376 # Did someone else sneak in between releasing the config lock and
377 # grabbing the diag lock?
378 if app._diag:
379 # Yup. Use their work.
380 return app._diag
381
382 # OK, go generate diagnostics.
383 _diag = self._generate_diagnostics()
384
385 # If that didn't work, no point in messing with the config lock.
386 if not _diag:
387 return None
388
389 # Once here, we need to - once again - grab the config lock to update
390 # app._diag. This is safe because this is the only place we ever mess
391 # with the diag lock, so nowhere else will try to grab the diag lock
392 # while holding the config lock.
393 with app.config_lock:
394 app._diag = _diag
395
396 # Finally, we can return app._diag to our caller.
397 return app._diag
398
399 @diag.setter
400 def diag(self, diag: Optional[Diagnostics]) -> None:
401 """
402 It turns out to be expensive to generate the Diagnostics class, so
403 app.diag is a property that does it on demand, handling Timers and
404 the config lock for you.
405
406 You MUST already hold the config_lock when trying to update app.diag.
407 You MUST NOT hold the diag_lock.
408 """
409 self._diag = diag
410
411 def _generate_diagnostics(self) -> Optional[Diagnostics]:
412 """
413 Do the heavy lifting of generating Diagnostics for our current configuration.
414 Really, only the diag() property should be calling this method, but if you
415 are convinced that you need to call it from elsewhere:
416
417 1. You're almost certainly wrong about needing to call it from elsewhere.
418 2. You MUST hold the diag_lock when calling this method.
419 3. You MUST NOT hold the config_lock when calling this method.
420 4. No, really, you're wrong. Don't call this method from anywhere but the
421 diag() property.
422 """
423
424 # Make sure we have an IR and econf to work with.
425 if not app.ir or not app.econf:
426 # Nope, bail.
427 return None
428
429 # OK, go ahead and generate diagnostics. Use the diag_timer to time
430 # this.
431 with self.diag_timer:
432 _diag = Diagnostics(app.ir, app.econf)
433
434 # Update some metrics data points given the new generated Diagnostics
435 diag_dict = _diag.as_dict()
436 self.diag_errors.set(len(diag_dict.get("errors", [])))
437 self.diag_notices.set(len(diag_dict.get("notices", [])))
438
439 # Note that we've updated diagnostics, since that might trigger a
440 # timer log.
441 self.reconf_stats.mark("diag")
442
443 return _diag
444
445 def check_scout(self, what: str) -> None:
446 self.watcher.post("SCOUT", (what, self.ir))
447
448 def post_timer_event(self) -> None:
449 # Post an event to do a timer check.
450 self.watcher.post("TIMER", None)
451
452 def check_timers(self) -> None:
453 # Actually do the timer check.
454
455 if self.reconf_stats.needs_timers():
456 # OK! Log the timers...
457
458 for t in [
459 self.config_timer,
460 self.fetcher_timer,
461 self.aconf_timer,
462 self.ir_timer,
463 self.econf_timer,
464 self.diag_timer,
465 ]:
466 if t:
467 self.logger.info(t.summary())
468
469 # ...and the cache statistics, if we can.
470 if self.cache:
471 self.cache.dump_stats()
472
473 # Always dump the reconfiguration stats...
474 self.reconf_stats.dump()
475
476 # ...and mark that the timers have been logged.
477 self.reconf_stats.mark_timers_logged()
478
479 # In this case we need to check to see if it's time to do a configuration
480 # check, too.
481 if self.reconf_stats.needs_check():
482 result = False
483
484 try:
485 result = self.check_cache()
486 except Exception as e:
487 tb = "\n".join(traceback.format_exception(*sys.exc_info()))
488 self.logger.error("CACHE: CHECK FAILED: %s\n%s" % (e, tb))
489
490 # Mark that the check has happened.
491 self.reconf_stats.mark_checked(result)
492
493 @staticmethod
494 def json_diff(what: str, j1: str, j2: str) -> str:
495 output = ""
496
497 l1 = j1.split("\n")
498 l2 = j2.split("\n")
499
500 n1 = f"{what} incremental"
501 n2 = f"{what} nonincremental"
502
503 output += "\n--------\n"
504
505 for line in difflib.context_diff(l1, l2, fromfile=n1, tofile=n2):
506 line = line.rstrip()
507 output += line
508 output += "\n"
509
510 return output
511
512 def check_cache(self) -> bool:
513 # We're going to build a shiny new IR and econf from our existing aconf, and make
514 # sure everything matches. We will _not_ use the existing cache for this.
515 #
516 # For this, make sure we have an IR already...
517 assert self.aconf
518 assert self.ir
519 assert self.econf
520
521 # Compute this IR/econf with a new empty cache. It saves a lot of trouble with
522 # having to delete cache keys from the JSON.
523
524 self.logger.debug("CACHE: starting check")
525 cache = Cache(self.logger)
526 scc = SecretHandler(app.logger, "check_cache", app.snapshot_path, "check")
527 ir = IR(self.aconf, secret_handler=scc, cache=cache)
528 econf = EnvoyConfig.generate(ir, cache=cache)
529
530 # This is testing code.
531 # name = list(ir.clusters.keys())[0]
532 # del(ir.clusters[name])
533
534 i1 = self.ir.as_json()
535 i2 = ir.as_json()
536
537 e1 = self.econf.as_json()
538 e2 = econf.as_json()
539
540 result = True
541 errors = ""
542
543 if i1 != i2:
544 result = False
545 self.logger.error("CACHE: IR MISMATCH")
546 errors += "IR diffs:\n"
547 errors += self.json_diff("IR", i1, i2)
548
549 if e1 != e2:
550 result = False
551 self.logger.error("CACHE: ENVOY CONFIG MISMATCH")
552 errors += "econf diffs:\n"
553 errors += self.json_diff("econf", i1, i2)
554
555 if not result:
556 err_path = os.path.join(self.snapshot_path, "diff-tmp.txt")
557
558 open(err_path, "w").write(errors)
559
560 snapcount = int(os.environ.get("AMBASSADOR_SNAPSHOT_COUNT", "4"))
561 snaplist: List[Tuple[str, str]] = []
562
563 if snapcount > 0:
564 # If snapcount is 4, this range statement becomes range(-4, -1)
565 # which gives [ -4, -3, -2 ], which the list comprehension turns
566 # into [ ( "-3", "-4" ), ( "-2", "-3" ), ( "-1", "-2" ) ]...
567 # which is the list of suffixes to rename to rotate the snapshots.
568
569 snaplist += [(str(x + 1), str(x)) for x in range(-1 * snapcount, -1)]
570
571 # After dealing with that, we need to rotate the current file into -1.
572 snaplist.append(("", "-1"))
573
574 # Whether or not we do any rotation, we need to cycle in the '-tmp' file.
575 snaplist.append(("-tmp", ""))
576
577 for from_suffix, to_suffix in snaplist:
578 from_path = os.path.join(app.snapshot_path, "diff{}.txt".format(from_suffix))
579 to_path = os.path.join(app.snapshot_path, "diff{}.txt".format(to_suffix))
580
581 try:
582 self.logger.debug("rotate: %s -> %s" % (from_path, to_path))
583 os.rename(from_path, to_path)
584 except IOError as e:
585 self.logger.debug("skip %s -> %s: %s" % (from_path, to_path, e))
586 except Exception as e:
587 self.logger.debug("could not rename %s -> %s: %s" % (from_path, to_path, e))
588
589 self.logger.info("CACHE: check %s" % ("succeeded" if result else "failed"))
590
591 return result
592
593
594# get the "templates" directory, or raise "FileNotFoundError" if not found
595def get_templates_dir():
596 res_dir = None
597 try:
598 # this will fail when not in a distribution
599 res_dir = resource_filename(Requirement.parse("ambassador"), "templates")
600 except:
601 pass
602
603 maybe_dirs = [res_dir, os.path.join(os.path.dirname(__file__), "..", "templates")]
604 for d in maybe_dirs:
605 if d and os.path.isdir(d):
606 return d
607 raise FileNotFoundError
608
609
610# Get the Flask app defined early. Setup happens later.
611app = DiagApp(__name__, template_folder=get_templates_dir())
612
613
614######## DECORATORS
615
616
617def standard_handler(f):
618 func_name = getattr(f, "__name__", "<anonymous>")
619
620 @functools.wraps(f)
621 def wrapper(*args, **kwds):
622 reqid = str(uuid.uuid4()).upper()
623 prefix = '%s: %s "%s %s"' % (reqid, request.remote_addr, request.method, request.path)
624
625 app.logger.debug("%s START" % prefix)
626
627 start = datetime.datetime.now()
628
629 app.logger.debug("%s handler %s" % (prefix, func_name))
630
631 # Getting elements in the `tvars_cache` will make sure eviction happens on `max_age_seconds` TTL
632 # for removed patch_client rather than waiting to fill `max_len`.
633 # Looping over a copied list of keys, to prevent mutating tvars_cache while iterating.
634 for k in list(tvars_cache.keys()):
635 tvars_cache.get(k)
636
637 # Default to the exception case
638 result_to_log = "server error"
639 status_to_log = 500
640 result_log_level = logging.ERROR
641 result = Response(result_to_log, status_to_log)
642
643 try:
644 result = f(*args, reqid=reqid, **kwds)
645 if not isinstance(result, Response):
646 result = Response(f"Invalid handler result {result}", status_to_log)
647
648 status_to_log = result.status_code
649
650 if (status_to_log // 100) == 2:
651 result_log_level = logging.INFO
652 result_to_log = "success"
653 else:
654 result_log_level = logging.ERROR
655 result_to_log = "failure"
656 except Exception as e:
657 app.logger.exception(e)
658
659 end = datetime.datetime.now()
660 ms = int(((end - start).total_seconds() * 1000) + 0.5)
661
662 app.logger.log(
663 result_log_level, "%s %dms %d %s" % (prefix, ms, status_to_log, result_to_log)
664 )
665
666 return result
667
668 return wrapper
669
670
671def internal_handler(f):
672 """
673 Reject requests where the remote address is not localhost. See the docstring
674 for _is_local_request() for important caveats!
675 """
676 func_name = getattr(f, "__name__", "<anonymous>")
677
678 @functools.wraps(f)
679 def wrapper(*args, **kwds):
680 if not _is_local_request():
681 return "Forbidden\n", 403
682 return f(*args, **kwds)
683
684 return wrapper
685
686
687######## UTILITIES
688
689
690def _is_local_request() -> bool:
691 """
692 Determine if this request originated with localhost.
693
694 We rely on healthcheck_server.go setting the X-Ambassador-Diag-IP header for us
695 (and we rely on it overwriting anything that's already there!).
696
697 It might be possible to consider the environment variables SERVER_NAME and
698 SERVER_PORT instead, as those are allegedly required by WSGI... but attempting
699 to do so in Flask/GUnicorn yielded a worse implementation that was still not
700 portable.
701
702 """
703
704 remote_addr: Optional[str] = ""
705
706 remote_addr = request.headers.get("X-Ambassador-Diag-IP")
707
708 return remote_addr == "127.0.0.1"
709
710
711def _allow_diag_ui() -> bool:
712 """
713 Helper function to check if diag ui traffic is allowed or not
714 based on the different flags from the config:
715 * diagnostics.enabled: Enable to diag UI by adding mappings
716 * diagnostics.allow_non_local: Allow non local traffic
717 even when diagnotics UI is disabled.
718 Mappings are not added for the diag UI
719 but the diagnotics UI is still exposed for
720 the pod IP in the admin port.
721 * local traffic or not: When diagnotics disagled and allow_non_local is false,
722 allow traffic only from localhost clients
723 """
724 enabled = False
725 allow_non_local = False
726 ir = app.ir
727 if ir:
728 enabled = ir.ambassador_module.diagnostics.get("enabled", False)
729 allow_non_local = ir.ambassador_module.diagnostics.get("allow_non_local", False)
730 if not enabled and not _is_local_request() and not allow_non_local:
731 return False
732 return True
733
734
735class Notices:
736 def __init__(self, local_config_path: str) -> None:
737 self.local_path = local_config_path
738 self.notices: List[Dict[str, str]] = []
739
740 def reset(self):
741 local_notices: List[Dict[str, str]] = []
742 local_data = ""
743
744 try:
745 local_stream = open(self.local_path, "r")
746 local_data = local_stream.read()
747 local_notices = parse_json(local_data)
748 except OSError:
749 pass
750 except:
751 local_notices.append(
752 {"level": "ERROR", "message": "bad local notices: %s" % local_data}
753 )
754
755 self.notices = local_notices
756 # app.logger.info("Notices: after RESET: %s" % dump_json(self.notices))
757
758 def post(self, notice):
759 # app.logger.debug("Notices: POST %s" % notice)
760 self.notices.append(notice)
761 # app.logger.info("Notices: after POST: %s" % dump_json(self.notices))
762
763 def prepend(self, notice):
764 # app.logger.debug("Notices: PREPEND %s" % notice)
765 self.notices.insert(0, notice)
766 # app.logger.info("Notices: after PREPEND: %s" % dump_json(self.notices))
767
768 def extend(self, notices):
769 for notice in notices:
770 self.post(notice)
771
772
773def td_format(td_object):
774 seconds = int(td_object.total_seconds())
775 periods = [
776 ("year", 60 * 60 * 24 * 365),
777 ("month", 60 * 60 * 24 * 30),
778 ("day", 60 * 60 * 24),
779 ("hour", 60 * 60),
780 ("minute", 60),
781 ("second", 1),
782 ]
783
784 strings = []
785 for period_name, period_seconds in periods:
786 if seconds > period_seconds:
787 period_value, seconds = divmod(seconds, period_seconds)
788
789 strings.append(
790 "%d %s%s" % (period_value, period_name, "" if (period_value == 1) else "s")
791 )
792
793 formatted = ", ".join(strings)
794
795 if not formatted:
796 formatted = "0s"
797
798 return formatted
799
800
801def interval_format(seconds, normal_format, now_message):
802 if seconds >= 1:
803 return normal_format % td_format(datetime.timedelta(seconds=seconds))
804 else:
805 return now_message
806
807
808def system_info(app):
809 ir = app.ir
810 debug_mode = False
811
812 if ir:
813 amod = ir.ambassador_module
814 debug_mode = amod.get("debug_mode", False)
815
816 app.logger.debug(f"DEBUG_MODE {debug_mode}")
817
818 status_dict = {"config failure": [False, "no configuration loaded"]}
819
820 env_status = getattr(app.watcher, "env_status", None)
821
822 if env_status:
823 status_dict = env_status.to_dict()
824 app.logger.debug(f"status_dict {status_dict}")
825
826 return {
827 "version": __version__,
828 "hostname": SystemInfo.MyHostName,
829 "ambassador_id": Config.ambassador_id,
830 "ambassador_namespace": Config.ambassador_namespace,
831 "single_namespace": Config.single_namespace,
832 "knative_enabled": os.environ.get("AMBASSADOR_KNATIVE_SUPPORT", "").lower() == "true",
833 "statsd_enabled": os.environ.get("STATSD_ENABLED", "").lower() == "true",
834 "endpoints_enabled": Config.enable_endpoints,
835 "cluster_id": os.environ.get(
836 "AMBASSADOR_CLUSTER_ID",
837 os.environ.get("AMBASSADOR_SCOUT_ID", "00000000-0000-0000-0000-000000000000"),
838 ),
839 "boot_time": boot_time,
840 "hr_uptime": td_format(datetime.datetime.now() - boot_time),
841 "latest_snapshot": app.latest_snapshot,
842 "env_good": getattr(app.watcher, "env_good", False),
843 "env_failures": getattr(app.watcher, "failure_list", ["no IR loaded"]),
844 "env_status": status_dict,
845 "debug_mode": debug_mode,
846 }
847
848
849def envoy_status(estats: EnvoyStats):
850 since_boot = interval_format(estats.time_since_boot(), "%s", "less than a second")
851
852 since_update = "Never updated"
853
854 if estats.time_since_update():
855 since_update = interval_format(
856 estats.time_since_update(), "%s ago", "within the last second"
857 )
858
859 return {
860 "alive": estats.is_alive(),
861 "ready": estats.is_ready(),
862 "uptime": since_boot,
863 "since_update": since_update,
864 }
865
866
867def drop_serializer_key(d: Dict[Any, Any]) -> Dict[Any, Any]:
868 """
869 Delete the "serialization" key (if present) in any dictionary passed in and
870 return that dictionary. This function is intended to be used as the
871 object_hook value for json.load[s].
872 """
873 _ = d.pop("serialization", None)
874 return d
875
876
877def filter_keys(d: Dict[Any, Any], keys_to_keep):
878 unwanted_keys = set(d) - set(keys_to_keep)
879 for unwanted_key in unwanted_keys:
880 del d[unwanted_key]
881
882
883def filter_webui(d: Dict[Any, Any]):
884 filter_keys(
885 d,
886 [
887 "system",
888 "route_info",
889 "source_map",
890 "ambassador_resolvers",
891 "ambassador_services",
892 "envoy_status",
893 "cluster_stats",
894 "loginfo",
895 "errors",
896 ],
897 )
898 for ambassador_resolver in d["ambassador_resolvers"]:
899 filter_keys(ambassador_resolver, ["_source", "kind"])
900 for route_info in d["route_info"]:
901 filter_keys(route_info, ["diag_class", "key", "headers", "precedence", "clusters"])
902 for cluster in route_info["clusters"]:
903 filter_keys(cluster, ["_hcolor", "type_label", "service", "weight"])
904
905
906@app.route("/_internal/v0/ping", methods=["GET"])
907@internal_handler
908def handle_ping():
909 return "ACK\n", 200
910
911
912@app.route("/_internal/v0/features", methods=["GET"])
913@internal_handler
914def handle_features():
915 # If we don't have an IR yet, do nothing.
916 #
917 # We don't bother grabbing the config_lock here because we're not changing
918 # anything, and an features request hitting at exactly the same moment as
919 # the first configure is a race anyway. If it fails, that's not a big deal,
920 # they can try again.
921 if not app.ir:
922 app.logger.debug("Features: configuration required first")
923 return "Can't do features before configuration", 503
924
925 return jsonify(app.ir.features()), 200
926
927
928@app.route("/_internal/v0/watt", methods=["POST"])
929@internal_handler
930def handle_watt_update():
931 url = request.args.get("url", None)
932
933 if not url:
934 app.logger.error("error: watt update requested with no URL")
935 return "error: watt update requested with no URL\n", 400
936
937 app.logger.debug("Update requested: watt, %s" % url)
938
939 status, info = app.watcher.post("CONFIG", ("watt", url))
940
941 return info, status
942
943
944@app.route("/_internal/v0/fs", methods=["POST"])
945@internal_handler
946def handle_fs():
947 path = request.args.get("path", None)
948
949 if not path:
950 app.logger.error("error: update requested with no PATH")
951 return "error: update requested with no PATH\n", 400
952
953 app.logger.debug("Update requested from %s" % path)
954
955 status, info = app.watcher.post("CONFIG_FS", path)
956
957 return info, status
958
959
960@app.route("/_internal/v0/events", methods=["GET"])
961@internal_handler
962def handle_events():
963 if not app.local_scout:
964 return "Local Scout is not enabled\n", 400
965 assert isinstance(app.scout._scout, LocalScout)
966
967 event_dump = [
968 (x["local_scout_timestamp"], x["mode"], x["action"], x) for x in app.scout._scout.events
969 ]
970
971 app.logger.debug(f"Event dump {event_dump}")
972
973 return jsonify(event_dump)
974
975
976@app.route("/ambassador/v0/favicon.ico", methods=["GET"])
977def favicon():
978 template_path = resource_filename(Requirement.parse("ambassador"), "templates")
979
980 return send_from_directory(template_path, "favicon.ico")
981
982
983@app.route("/ambassador/v0/check_alive", methods=["GET"])
984def check_alive():
985 status = envoy_status(app.estatsmgr.get_stats())
986
987 if status["alive"]:
988 return "ambassador liveness check OK (%s)\n" % status["uptime"], 200
989 else:
990 return "ambassador seems to have died (%s)\n" % status["uptime"], 503
991
992
993@app.route("/ambassador/v0/check_ready", methods=["GET"])
994def check_ready():
995 if not app.ir:
996 return "ambassador waiting for config\n", 503
997
998 status = envoy_status(app.estatsmgr.get_stats())
999
1000 if status["ready"]:
1001 return "ambassador readiness check OK (%s)\n" % status["since_update"], 200
1002 else:
1003 return "ambassador not ready (%s)\n" % status["since_update"], 503
1004
1005
1006@app.route("/ambassador/v0/diag/", methods=["GET"])
1007@standard_handler
1008def show_overview(reqid=None):
1009 # If we don't have an IR yet, do nothing.
1010 #
1011 # We don't bother grabbing the config_lock here because we're not changing
1012 # anything, and an overview request hitting at exactly the same moment as
1013 # the first configure is a race anyway. If it fails, that's not a big deal,
1014 # they can try again.
1015 if not app.ir:
1016 app.logger.debug("OV %s - can't do overview before configuration" % reqid)
1017 return "Can't do overview before configuration", 503
1018
1019 if not _allow_diag_ui():
1020 return Response("Not found\n", 404)
1021
1022 app.logger.debug("OV %s - showing overview" % reqid)
1023
1024 # Remember that app.diag is a property that can involve some real expense
1025 # to compute -- we don't want to call it more than once here, so we cache
1026 # its value.
1027 diag = app.diag
1028 assert diag
1029
1030 if app.verbose:
1031 app.logger.debug("OV %s: DIAG" % reqid)
1032 app.logger.debug("%s" % dump_json(diag.as_dict(), pretty=True))
1033
1034 estats = app.estatsmgr.get_stats()
1035 ov = diag.overview(request, estats)
1036
1037 if app.verbose:
1038 app.logger.debug("OV %s: OV" % reqid)
1039 app.logger.debug("%s" % dump_json(ov, pretty=True))
1040 app.logger.debug("OV %s: collecting errors" % reqid)
1041
1042 ddict = collect_errors_and_notices(request, reqid, "overview", diag)
1043
1044 banner_content = None
1045 if app.banner_endpoint and app.ir and app.ir.edge_stack_allowed:
1046 try:
1047 response = requests.get(app.banner_endpoint)
1048 if response.status_code == 200:
1049 banner_content = response.text
1050 except Exception as e:
1051 app.logger.error("could not get banner_content: %s" % e)
1052
1053 tvars = dict(
1054 system=system_info(app),
1055 envoy_status=envoy_status(estats),
1056 loginfo=app.estatsmgr.loginfo,
1057 notices=app.notices.notices,
1058 banner_content=banner_content,
1059 **ov,
1060 **ddict,
1061 )
1062
1063 patch_client = request.args.get("patch_client", None)
1064 if request.args.get("json", None):
1065 filter_key = request.args.get("filter", None)
1066
1067 if filter_key == "webui":
1068 filter_webui(tvars)
1069 elif filter_key:
1070 return jsonify(tvars.get(filter_key, None))
1071
1072 if patch_client:
1073 # Assume this is the Admin UI. Recursively drop all "serialization"
1074 # keys. This avoids leaking secrets and generally makes the
1075 # snapshot a lot smaller without losing information that the Admin
1076 # UI cares about. We do this below by setting the object_hook
1077 # parameter of the json.loads(...) call. We have to use python's
1078 # json library instead of orjson, because orjson does not support
1079 # the object_hook feature.
1080
1081 # Get the previous full representation
1082 cached_tvars_json = tvars_cache.get(patch_client, dict())
1083 # Serialize the tvars into a json-string using the same jsonify Flask serializer, then load the json object
1084 response_content = json.loads(flask_json.dumps(tvars), object_hook=drop_serializer_key)
1085 # Diff between the previous representation and the current full representation (http://jsonpatch.com/)
1086 patch = jsonpatch.make_patch(cached_tvars_json, response_content)
1087 # Save the current full representation in memory
1088 tvars_cache[patch_client] = response_content
1089
1090 # Return only the diff
1091 return Response(patch.to_string(), mimetype="application/json")
1092 else:
1093 return jsonify(tvars)
1094 else:
1095 app.check_scout("overview")
1096 return Response(render_template("overview.html", **tvars))
1097
1098
1099def collect_errors_and_notices(request, reqid, what: str, diag: Diagnostics) -> Dict:
1100 loglevel = request.args.get("loglevel", None)
1101 notice = None
1102
1103 if loglevel:
1104 app.logger.debug("%s %s -- requesting loglevel %s" % (what, reqid, loglevel))
1105 app.diag_log_level.labels("debug").set(1 if loglevel == "debug" else 0)
1106
1107 if not app.estatsmgr.update_log_levels(time.time(), level=loglevel):
1108 notice = {"level": "WARNING", "message": "Could not update log level!"}
1109 # else:
1110 # return redirect("/ambassador/v0/diag/", code=302)
1111
1112 # We need to grab errors and notices from diag.as_dict(), process the errors so
1113 # they work for the HTML rendering, and post the notices to app.notices. Then we
1114 # return the dict representation that our caller should work with.
1115
1116 ddict = diag.as_dict()
1117
1118 # app.logger.debug("ddict %s" % dump_json(ddict, pretty=True))
1119
1120 derrors = ddict.pop("errors", {})
1121
1122 errors = []
1123
1124 for err_key, err_list in derrors.items():
1125 if err_key == "-global-":
1126 err_key = ""
1127
1128 for err in err_list:
1129 errors.append((err_key, err["error"]))
1130
1131 dnotices = ddict.pop("notices", {})
1132
1133 # Make sure that anything about the loglevel gets folded into this set.
1134 if notice:
1135 app.notices.prepend(notice)
1136
1137 for notice_key, notice_list in dnotices.items():
1138 for notice in notice_list:
1139 app.notices.post({"level": "NOTICE", "message": "%s: %s" % (notice_key, notice)})
1140
1141 ddict["errors"] = errors
1142
1143 return ddict
1144
1145
1146@app.route("/ambassador/v0/diag/<path:source>", methods=["GET"])
1147@standard_handler
1148def show_intermediate(source=None, reqid=None):
1149 # If we don't have an IR yet, do nothing.
1150 #
1151 # We don't bother grabbing the config_lock here because we're not changing
1152 # anything, and an overview request hitting at exactly the same moment as
1153 # the first configure is a race anyway. If it fails, that's not a big deal,
1154 # they can try again.
1155 if not app.ir:
1156 app.logger.debug(
1157 "SRC %s - can't do intermediate for %s before configuration" % (reqid, source)
1158 )
1159 return "Can't do overview before configuration", 503
1160
1161 if not _allow_diag_ui():
1162 return Response("Not found\n", 404)
1163
1164 app.logger.debug("SRC %s - getting intermediate for '%s'" % (reqid, source))
1165
1166 # Remember that app.diag is a property that can involve some real expense
1167 # to compute -- we don't want to call it more than once here, so we cache
1168 # its value.
1169 diag = app.diag
1170 assert diag
1171
1172 method = request.args.get("method", None)
1173 resource = request.args.get("resource", None)
1174
1175 estats = app.estatsmgr.get_stats()
1176 result = diag.lookup(request, source, estats)
1177 assert result
1178
1179 if app.verbose:
1180 app.logger.debug("RESULT %s" % dump_json(result, pretty=True))
1181
1182 ddict = collect_errors_and_notices(request, reqid, "detail %s" % source, diag)
1183
1184 tvars = {
1185 "system": system_info(app),
1186 "envoy_status": envoy_status(estats),
1187 "loginfo": app.estatsmgr.loginfo,
1188 "notices": app.notices.notices,
1189 "method": method,
1190 "resource": resource,
1191 **result,
1192 **ddict,
1193 }
1194
1195 if request.args.get("json", None):
1196 key = request.args.get("filter", None)
1197
1198 if key:
1199 return jsonify(tvars.get(key, None))
1200 else:
1201 return jsonify(tvars)
1202 else:
1203 app.check_scout("detail: %s" % source)
1204 return Response(render_template("diag.html", **tvars))
1205
1206
1207@app.template_filter("sort_by_key")
1208def sort_by_key(objects):
1209 return sorted(objects, key=lambda x: x["key"])
1210
1211
1212@app.template_filter("pretty_json")
1213def pretty_json(obj):
1214 if isinstance(obj, dict):
1215 obj = dict(**obj)
1216
1217 keys_to_drop = [key for key in obj.keys() if key.startswith("_")]
1218
1219 for key in keys_to_drop:
1220 del obj[key]
1221
1222 return dump_json(obj, pretty=True)
1223
1224
1225@app.template_filter("sort_clusters_by_service")
1226def sort_clusters_by_service(clusters):
1227 return sorted(clusters, key=lambda x: x["service"])
1228 # return sorted([ c for c in clusters.values() ], key=lambda x: x['service'])
1229
1230
1231@app.template_filter("source_lookup")
1232def source_lookup(name, sources):
1233 app.logger.debug("%s => sources %s" % (name, sources))
1234
1235 source = sources.get(name, {})
1236
1237 app.logger.debug("%s => source %s" % (name, source))
1238
1239 return source.get("_source", name)
1240
1241
1242@app.route("/metrics", methods=["GET"])
1243@standard_handler
1244def get_prometheus_metrics(*args, **kwargs):
1245 # Envoy metrics
1246 envoy_metrics = app.estatsmgr.get_prometheus_stats()
1247
1248 # Ambassador OSS metrics
1249 ambassador_metrics = generate_latest(registry=app.metrics_registry).decode("utf-8")
1250
1251 # Extra metrics endpoint
1252 extra_metrics_content = ""
1253 if app.metrics_endpoint and app.ir and app.ir.edge_stack_allowed:
1254 try:
1255 response = requests.get(app.metrics_endpoint)
1256 if response.status_code == 200:
1257 extra_metrics_content = response.text
1258 except Exception as e:
1259 app.logger.error("could not get metrics_endpoint: %s" % e)
1260
1261 return Response(
1262 "".join([envoy_metrics, ambassador_metrics, extra_metrics_content]).encode("utf-8"),
1263 200,
1264 mimetype="text/plain",
1265 )
1266
1267
1268def bool_fmt(b: bool) -> str:
1269 return "T" if b else "F"
1270
1271
1272class StatusInfo:
1273 def __init__(self) -> None:
1274 self.status = True
1275 self.specifics: List[Tuple[bool, str]] = []
1276
1277 def failure(self, message: str) -> None:
1278 self.status = False
1279 self.specifics.append((False, message))
1280
1281 def OK(self, message: str) -> None:
1282 self.specifics.append((True, message))
1283
1284 def to_dict(self) -> Dict[str, Union[bool, List[Tuple[bool, str]]]]:
1285 return {"status": self.status, "specifics": self.specifics}
1286
1287
1288class SystemStatus:
1289 def __init__(self) -> None:
1290 self.status: Dict[str, StatusInfo] = {}
1291
1292 def failure(self, key: str, message: str) -> None:
1293 self.info_for_key(key).failure(message)
1294
1295 def OK(self, key: str, message: str) -> None:
1296 self.info_for_key(key).OK(message)
1297
1298 def info_for_key(self, key) -> StatusInfo:
1299 if key not in self.status:
1300 self.status[key] = StatusInfo()
1301
1302 return self.status[key]
1303
1304 def to_dict(self) -> Dict[str, Dict[str, Union[bool, List[Tuple[bool, str]]]]]:
1305 return {key: info.to_dict() for key, info in self.status.items()}
1306
1307
1308class KubeStatus:
1309 pool: concurrent.futures.ProcessPoolExecutor
1310
1311 def __init__(self, app) -> None:
1312 self.app = app
1313 self.logger = app.logger
1314 self.live: Dict[str, bool] = {}
1315 self.current_status: Dict[str, str] = {}
1316 self.pool = concurrent.futures.ProcessPoolExecutor(max_workers=5)
1317
1318 def mark_live(self, kind: str, name: str, namespace: str) -> None:
1319 key = f"{kind}/{name}.{namespace}"
1320
1321 # self.logger.debug(f"KubeStatus MASTER {os.getpid()}: mark_live {key}")
1322 self.live[key] = True
1323
1324 def prune(self) -> None:
1325 drop: List[str] = []
1326
1327 for key in self.current_status.keys():
1328 if not self.live.get(key, False):
1329 drop.append(key)
1330
1331 for key in drop:
1332 # self.logger.debug(f"KubeStatus MASTER {os.getpid()}: prune {key}")
1333 del self.current_status[key]
1334
1335 self.live = {}
1336
1337 def post(self, kind: str, name: str, namespace: str, text: str) -> None:
1338 key = f"{kind}/{name}.{namespace}"
1339 extant = self.current_status.get(key, None)
1340
1341 if extant == text:
1342 # self.logger.info(f"KubeStatus MASTER {os.getpid()}: {key} == {text}")
1343 pass
1344 else:
1345 # self.logger.info(f"KubeStatus MASTER {os.getpid()}: {key} needs {text}")
1346
1347 # For now we're going to assume that this works.
1348 self.current_status[key] = text
1349 f = self.pool.submit(kubestatus_update, kind, name, namespace, text)
1350 f.add_done_callback(kubestatus_update_done)
1351
1352
1353# The KubeStatusNoMappings class clobbers the mark_live() method of the
1354# KubeStatus class, so that updates to Mappings don't actually have any
1355# effect, but updates to Ingress (for example) do.
1356class KubeStatusNoMappings(KubeStatus):
1357 def mark_live(self, kind: str, name: str, namespace: str) -> None:
1358 pass
1359
1360 def post(self, kind: str, name: str, namespace: str, text: str) -> None:
1361 # There's a path (via IRBaseMapping.check_status) where a Mapping
1362 # can be added directly to ir.k8s_status_updates, which will come
1363 # straight here without mark_live being involved -- so short-circuit
1364 # here for Mappings, too.
1365
1366 if kind == "Mapping":
1367 return
1368
1369 super().post(kind, name, namespace, text)
1370
1371
1372def kubestatus_update(kind: str, name: str, namespace: str, text: str) -> str:
1373 cmd = [
1374 "kubestatus",
1375 "--cache-dir",
1376 "/tmp/client-go-http-cache",
1377 kind,
1378 name,
1379 "-n",
1380 namespace,
1381 "-u",
1382 "/dev/fd/0",
1383 ]
1384 # print(f"KubeStatus UPDATE {os.getpid()}: running command: {cmd}")
1385
1386 try:
1387 rc = subprocess.run(
1388 cmd,
1389 input=text.encode("utf-8"),
1390 stdout=subprocess.PIPE,
1391 stderr=subprocess.STDOUT,
1392 timeout=5,
1393 )
1394 if rc.returncode == 0:
1395 return f"{name}.{namespace}: update OK"
1396 else:
1397 return f"{name}.{namespace}: error {rc.returncode}"
1398
1399 except subprocess.TimeoutExpired as e:
1400 return f"{name}.{namespace}: timed out\n\n{e.output}"
1401
1402
1403def kubestatus_update_done(f: concurrent.futures.Future) -> None:
1404 # print(f"KubeStatus DONE {os.getpid()}: result {f.result()}")
1405 pass
1406
1407
1408class AmbassadorEventWatcher(threading.Thread):
1409 # The key for 'Actions' is chimed - chimed_ok - env_good. This will make more sense
1410 # if you read through the _load_ir method.
1411
1412 Actions = {
1413 "F-F-F": ("unhealthy", True), # make sure the first chime always gets out
1414 "F-F-T": ("now-healthy", True), # make sure the first chime always gets out
1415 "F-T-F": ("now-unhealthy", True), # this is actually impossible
1416 "F-T-T": ("healthy", True), # this is actually impossible
1417 "T-F-F": ("unhealthy", False),
1418 "T-F-T": ("now-healthy", True),
1419 "T-T-F": ("now-unhealthy", True),
1420 "T-T-T": ("update", False),
1421 }
1422
1423 reCompressed = re.compile(r"-\d+$")
1424
1425 def __init__(self, app: DiagApp) -> None:
1426 super().__init__(name="AEW", daemon=True)
1427 self.app = app
1428 self.logger = self.app.logger
1429 self.events: queue.Queue = queue.Queue()
1430
1431 self.chimed = False # Have we ever sent a chime about the environment?
1432 self.last_chime = False # What was the status of our last chime? (starts as False)
1433 self.env_good = False # Is our environment currently believed to be OK?
1434 self.failure_list: List[str] = [
1435 "unhealthy at boot"
1436 ] # What's making our environment not OK?
1437
1438 def post(
1439 self, cmd: str, arg: Optional[Union[str, Tuple[str, Optional[IR]], Tuple[str, str]]]
1440 ) -> Tuple[int, str]:
1441 rqueue: queue.Queue = queue.Queue()
1442
1443 self.events.put((cmd, arg, rqueue))
1444
1445 return rqueue.get()
1446
1447 def update_estats(self) -> None:
1448 self.app.estatsmgr.update()
1449
1450 def run(self):
1451 self.logger.info("starting Scout checker and timer logger")
1452 self.app.scout_checker = PeriodicTrigger(
1453 lambda: self.check_scout("checkin"), period=86400
1454 ) # Yup, one day.
1455 self.app.timer_logger = PeriodicTrigger(self.app.post_timer_event, period=1)
1456
1457 self.logger.info("starting event watcher")
1458
1459 while True:
1460 cmd, arg, rqueue = self.events.get()
1461 # self.logger.info("EVENT: %s" % cmd)
1462
1463 if cmd == "CONFIG_FS":
1464 try:
1465 self.load_config_fs(rqueue, arg)
1466 except Exception as e:
1467 self.logger.error("could not reconfigure: %s" % e)
1468 self.logger.exception(e)
1469 self._respond(rqueue, 500, "configuration from filesystem failed")
1470 elif cmd == "CONFIG":
1471 version, url = arg
1472
1473 try:
1474 if version == "watt":
1475 self.load_config_watt(rqueue, url)
1476 else:
1477 raise RuntimeError("config from %s not supported" % version)
1478 except Exception as e:
1479 self.logger.error("could not reconfigure: %s" % e)
1480 self.logger.exception(e)
1481 self._respond(rqueue, 500, "configuration failed")
1482 elif cmd == "SCOUT":
1483 try:
1484 self._respond(rqueue, 200, "checking Scout")
1485 self.check_scout(*arg)
1486 except Exception as e:
1487 self.logger.error("could not reconfigure: %s" % e)
1488 self.logger.exception(e)
1489 self._respond(rqueue, 500, "scout check failed")
1490 elif cmd == "TIMER":
1491 try:
1492 self._respond(rqueue, 200, "done")
1493 self.app.check_timers()
1494 except Exception as e:
1495 self.logger.error("could not check timers? %s" % e)
1496 self.logger.exception(e)
1497 else:
1498 self.logger.error(f"unknown event type: '{cmd}' '{arg}'")
1499 self._respond(rqueue, 400, f"unknown event type '{cmd}' '{arg}'")
1500
1501 def _respond(self, rqueue: queue.Queue, status: int, info="") -> None:
1502 # self.logger.debug("responding to query with %s %s" % (status, info))
1503 rqueue.put((status, info))
1504
1505 # load_config_fs reconfigures from the filesystem. It's _mostly_ legacy
1506 # code, but not entirely, since Docker demo mode still uses it.
1507 #
1508 # BE CAREFUL ABOUT STOPPING THE RECONFIGURATION TIMER ONCE IT IS STARTED.
1509 def load_config_fs(self, rqueue: queue.Queue, path: str) -> None:
1510 self.logger.debug("loading configuration from disk: %s" % path)
1511
1512 # The "path" here can just be a path, but it can also be a command for testing,
1513 # if the user has chosen to allow that.
1514
1515 if self.app.allow_fs_commands and (":" in path):
1516 pfx, rest = path.split(":", 1)
1517
1518 if pfx.lower() == "cmd":
1519 fields = rest.split(":", 1)
1520
1521 cmd = fields[0].upper()
1522
1523 args = fields[1:] if (len(fields) > 1) else None
1524
1525 if cmd.upper() == "CHIME":
1526 self.logger.info("CMD: Chiming")
1527
1528 self.chime()
1529
1530 self._respond(rqueue, 200, "Chimed")
1531 elif cmd.upper() == "CHIME_RESET":
1532 self.chimed = False
1533 self.last_chime = False
1534 self.env_good = False
1535
1536 self.app.scout.reset_events()
1537 self.app.scout.report(mode="boot", action="boot1", no_cache=True)
1538
1539 self.logger.info("CMD: Reset chime state")
1540 self._respond(rqueue, 200, "CMD: Reset chime state")
1541 elif cmd.upper() == "SCOUT_CACHE_RESET":
1542 self.app.scout.reset_cache_time()
1543
1544 self.logger.info("CMD: Reset Scout cache time")
1545 self._respond(rqueue, 200, "CMD: Reset Scout cache time")
1546 elif cmd.upper() == "ENV_OK":
1547 self.env_good = True
1548 self.failure_list = []
1549
1550 self.logger.info("CMD: Marked environment good")
1551 self._respond(rqueue, 200, "CMD: Marked environment good")
1552 elif cmd.upper() == "ENV_BAD":
1553 self.env_good = False
1554 self.failure_list = ["failure forced"]
1555
1556 self.logger.info("CMD: Marked environment bad")
1557 self._respond(rqueue, 200, "CMD: Marked environment bad")
1558 else:
1559 self.logger.info(f'CMD: no such command "{cmd}"')
1560 self._respond(rqueue, 400, f'CMD: no such command "{cmd}"')
1561
1562 return
1563 else:
1564 self.logger.info(f'CONFIG_FS: invalid prefix "{pfx}"')
1565 self._respond(rqueue, 400, f'CONFIG_FS: invalid prefix "{pfx}"')
1566
1567 return
1568
1569 # OK, we're starting a reconfiguration. BE CAREFUL TO STOP THE TIMER
1570 # BEFORE YOU RESPOND TO THE CALLER.
1571 self.app.config_timer.start()
1572
1573 snapshot = re.sub(r"[^A-Za-z0-9_-]", "_", path)
1574 scc = FSSecretHandler(app.logger, path, app.snapshot_path, "0")
1575
1576 with self.app.fetcher_timer:
1577 aconf = Config()
1578 fetcher = ResourceFetcher(app.logger, aconf)
1579 fetcher.load_from_filesystem(path, k8s=app.k8s, recurse=True)
1580
1581 if not fetcher.elements:
1582 self.logger.debug("no configuration resources found at %s" % path)
1583 # Don't bail from here -- go ahead and reload the IR.
1584 #
1585 # XXX This is basically historical logic, honestly. But if you try
1586 # to respond from here and bail, STOP THE RECONFIGURATION TIMER.
1587
1588 self._load_ir(rqueue, aconf, fetcher, scc, snapshot)
1589
1590 # load_config_watt reconfigures from the filesystem. It's the one true way of
1591 # reconfiguring these days.
1592 #
1593 # BE CAREFUL ABOUT STOPPING THE RECONFIGURATION TIMER ONCE IT IS STARTED.
1594 def load_config_watt(self, rqueue: queue.Queue, url: str):
1595 snapshot = url.split("/")[-1]
1596 ss_path = os.path.join(app.snapshot_path, "snapshot-tmp.yaml")
1597
1598 # OK, we're starting a reconfiguration. BE CAREFUL TO STOP THE TIMER
1599 # BEFORE YOU RESPOND TO THE CALLER.
1600 self.app.config_timer.start()
1601
1602 self.logger.debug("copying configuration: watt, %s to %s" % (url, ss_path))
1603
1604 # Grab the serialization, and save it to disk too.
1605 serialization = load_url_contents(self.logger, url, stream2=open(ss_path, "w"))
1606
1607 if not serialization:
1608 self.logger.debug("no data loaded from snapshot %s" % snapshot)
1609 # We never used to return here. I'm not sure if that's really correct?
1610 #
1611 # IF YOU CHANGE THIS, BE CAREFUL TO STOP THE RECONFIGURATION TIMER.
1612
1613 # Weirdly, we don't need a special WattSecretHandler: parse_watt knows how to handle
1614 # the secrets that watt sends.
1615 scc = SecretHandler(app.logger, url, app.snapshot_path, snapshot)
1616
1617 # OK. Time the various configuration sections separately.
1618
1619 with self.app.fetcher_timer:
1620 aconf = Config()
1621 fetcher = ResourceFetcher(app.logger, aconf)
1622
1623 if serialization:
1624 fetcher.parse_watt(serialization)
1625
1626 if not fetcher.elements:
1627 self.logger.debug("no configuration found in snapshot %s" % snapshot)
1628
1629 # Don't actually bail here. If they send over a valid config that happens
1630 # to have nothing for us, it's still a legit config.
1631 #
1632 # IF YOU CHANGE THIS, BE CAREFUL TO STOP THE RECONFIGURATION TIMER.
1633
1634 self._load_ir(rqueue, aconf, fetcher, scc, snapshot)
1635
1636 # _load_ir is where the heavy lifting of a reconfigure happens.
1637 #
1638 # AT THE POINT OF ENTRY, THE RECONFIGURATION TIMER IS RUNNING. DO NOT LEAVE
1639 # THIS METHOD WITHOUT STOPPING THE RECONFIGURATION TIMER.
1640 def _load_ir(
1641 self,
1642 rqueue: queue.Queue,
1643 aconf: Config,
1644 fetcher: ResourceFetcher,
1645 secret_handler: SecretHandler,
1646 snapshot: str,
1647 ) -> None:
1648 with self.app.aconf_timer:
1649 aconf.load_all(fetcher.sorted())
1650
1651 # TODO(Flynn): This is an awful hack. Have aconf.load(fetcher) that does
1652 # this correctly.
1653 #
1654 # I'm not doing this at this moment because aconf.load_all() is called in a
1655 # lot of places, and I don't want to destablize 2.2.2.
1656 aconf.load_invalid(fetcher)
1657
1658 aconf_path = os.path.join(app.snapshot_path, "aconf-tmp.json")
1659 open(aconf_path, "w").write(aconf.as_json())
1660
1661 # OK. What kind of reconfiguration are we doing?
1662 config_type, reset_cache, invalidate_groups_for = IR.check_deltas(
1663 self.logger, fetcher, self.app.cache
1664 )
1665
1666 if reset_cache:
1667 self.logger.debug("RESETTING CACHE")
1668 self.app.cache = Cache(self.logger)
1669
1670 with self.app.ir_timer:
1671 ir = IR(
1672 aconf,
1673 secret_handler=secret_handler,
1674 invalidate_groups_for=invalidate_groups_for,
1675 cache=self.app.cache,
1676 )
1677
1678 ir_path = os.path.join(app.snapshot_path, "ir-tmp.json")
1679 open(ir_path, "w").write(ir.as_json())
1680
1681 with self.app.econf_timer:
1682 self.logger.debug("generating envoy configuration")
1683 econf = EnvoyConfig.generate(ir, cache=self.app.cache)
1684
1685 # DON'T generate the Diagnostics here, because that turns out to be expensive.
1686 # Instead, we'll just reset app.diag to None, then generate it on-demand when
1687 # we need it.
1688 #
1689 # DO go ahead and split the Envoy config into its components for later, though.
1690 bootstrap_config, ads_config, clustermap = econf.split_config()
1691
1692 # OK. Assume that the Envoy config is valid...
1693 econf_is_valid = True
1694 econf_bad_reason = ""
1695
1696 # ...then look for reasons it's not valid.
1697 if not econf.has_listeners():
1698 # No listeners == something in the Ambassador config is totally horked.
1699 # Probably this is the user not defining any Hosts that match
1700 # the Listeners in the system.
1701 #
1702 # As it happens, Envoy is OK running a config with no listeners, and it'll
1703 # answer on port 8001 for readiness checks, so... log a notice, but run with it.
1704 self.logger.warning(
1705 "No active listeners at all; check your Listener and Host configuration"
1706 )
1707 elif not self.validate_envoy_config(
1708 ir, config=ads_config, retries=self.app.validation_retries
1709 ):
1710 # Invalid Envoy config probably indicates a bug in Emissary itself. Sigh.
1711 econf_is_valid = False
1712 econf_bad_reason = "invalid envoy configuration generated"
1713
1714 # OK. Is the config invalid?
1715 if not econf_is_valid:
1716 # BZzzt. Don't post this update.
1717 self.logger.info(
1718 "no update performed (%s), continuing with current configuration..."
1719 % econf_bad_reason
1720 )
1721
1722 # Don't use app.check_scout; it will deadlock.
1723 self.check_scout("attempted bad update")
1724
1725 # DO stop the reconfiguration timer before leaving.
1726 self.app.config_timer.stop()
1727 self._respond(
1728 rqueue, 500, "ignoring (%s) in snapshot %s" % (econf_bad_reason, snapshot)
1729 )
1730 return
1731
1732 snapcount = int(os.environ.get("AMBASSADOR_SNAPSHOT_COUNT", "4"))
1733 snaplist: List[Tuple[str, str]] = []
1734
1735 if snapcount > 0:
1736 self.logger.debug("rotating snapshots for snapshot %s" % snapshot)
1737
1738 # If snapcount is 4, this range statement becomes range(-4, -1)
1739 # which gives [ -4, -3, -2 ], which the list comprehension turns
1740 # into [ ( "-3", "-4" ), ( "-2", "-3" ), ( "-1", "-2" ) ]...
1741 # which is the list of suffixes to rename to rotate the snapshots.
1742
1743 snaplist += [(str(x + 1), str(x)) for x in range(-1 * snapcount, -1)]
1744
1745 # After dealing with that, we need to rotate the current file into -1.
1746 snaplist.append(("", "-1"))
1747
1748 # Whether or not we do any rotation, we need to cycle in the '-tmp' file.
1749 snaplist.append(("-tmp", ""))
1750
1751 for from_suffix, to_suffix in snaplist:
1752 for fmt in ["aconf{}.json", "econf{}.json", "ir{}.json", "snapshot{}.yaml"]:
1753 from_path = os.path.join(app.snapshot_path, fmt.format(from_suffix))
1754 to_path = os.path.join(app.snapshot_path, fmt.format(to_suffix))
1755
1756 # Make sure we don't leave this method on error! The reconfiguration
1757 # timer is still running, but also, the snapshots are a debugging aid:
1758 # if we can't rotate them, meh, whatever.
1759
1760 try:
1761 self.logger.debug("rotate: %s -> %s" % (from_path, to_path))
1762 os.rename(from_path, to_path)
1763 except IOError as e:
1764 self.logger.debug("skip %s -> %s: %s" % (from_path, to_path, e))
1765 except Exception as e:
1766 self.logger.debug("could not rename %s -> %s: %s" % (from_path, to_path, e))
1767
1768 app.latest_snapshot = snapshot
1769 self.logger.debug("saving Envoy configuration for snapshot %s" % snapshot)
1770
1771 with open(app.bootstrap_path, "w") as output:
1772 output.write(dump_json(bootstrap_config, pretty=True))
1773
1774 with open(app.ads_path, "w") as output:
1775 output.write(dump_json(ads_config, pretty=True))
1776
1777 with open(app.clustermap_path, "w") as output:
1778 output.write(dump_json(clustermap, pretty=True))
1779
1780 with app.config_lock:
1781 app.aconf = aconf
1782 app.ir = ir
1783 app.econf = econf
1784
1785 # Force app.diag to None so that it'll be regenerated on-demand.
1786 app.diag = None
1787
1788 # We're finally done with the whole configuration process.
1789 self.app.config_timer.stop()
1790
1791 if app.kick:
1792 self.logger.debug("running '%s'" % app.kick)
1793 os.system(app.kick)
1794 elif app.ambex_pid != 0:
1795 self.logger.debug("notifying PID %d ambex" % app.ambex_pid)
1796 os.kill(app.ambex_pid, signal.SIGHUP)
1797
1798 # don't worry about TCPMappings yet
1799 mappings = app.aconf.get_config("mappings")
1800
1801 if mappings:
1802 for mapping_name, mapping in mappings.items():
1803 app.kubestatus.mark_live(
1804 "Mapping", mapping_name, mapping.get("namespace", Config.ambassador_namespace)
1805 )
1806
1807 app.kubestatus.prune()
1808
1809 if app.ir.k8s_status_updates:
1810 update_count = 0
1811
1812 for name in app.ir.k8s_status_updates.keys():
1813 update_count += 1
1814 # Strip off any namespace in the name.
1815 resource_name = name.split(".", 1)[0]
1816 kind, namespace, update = app.ir.k8s_status_updates[name]
1817 text = dump_json(update)
1818
1819 # self.logger.debug(f"K8s status update: {kind} {resource_name}.{namespace}, {text}...")
1820
1821 app.kubestatus.post(kind, resource_name, namespace, text)
1822
1823 group_count = len(app.ir.groups)
1824 cluster_count = len(app.ir.clusters)
1825 listener_count = len(app.ir.listeners)
1826 service_count = len(app.ir.services)
1827
1828 self._respond(
1829 rqueue, 200, "configuration updated (%s) from snapshot %s" % (config_type, snapshot)
1830 )
1831
1832 self.logger.info(
1833 "configuration updated (%s) from snapshot %s (S%d L%d G%d C%d)"
1834 % (config_type, snapshot, service_count, listener_count, group_count, cluster_count)
1835 )
1836
1837 # Remember that we've reconfigured.
1838 self.app.reconf_stats.mark(config_type)
1839
1840 if app.health_checks and not app.stats_updater:
1841 app.logger.debug("starting Envoy status updater")
1842 app.stats_updater = PeriodicTrigger(app.watcher.update_estats, period=5)
1843
1844 # Check our environment...
1845 self.check_environment()
1846
1847 self.chime()
1848
1849 def chime(self):
1850 # In general, our reports here should be action "update", and they should honor the
1851 # Scout cache, but we need to tweak that depending on whether we've done this before
1852 # and on whether the environment looks OK.
1853
1854 already_chimed = bool_fmt(self.chimed)
1855 was_ok = bool_fmt(self.last_chime)
1856 now_ok = bool_fmt(self.env_good)
1857
1858 # Poor man's state machine...
1859 action_key = f"{already_chimed}-{was_ok}-{now_ok}"
1860 action, no_cache = AmbassadorEventWatcher.Actions[action_key]
1861
1862 self.logger.debug(f"CHIME: {action_key}")
1863
1864 class ChimeArgs(TypedDict):
1865 no_cache: NotRequired[Optional[bool]]
1866 ir: NotRequired[Optional[IR]]
1867 failures: NotRequired[Optional[List[str]]]
1868 action_key: NotRequired[Optional[str]]
1869
1870 chime_args: ChimeArgs = {"no_cache": no_cache, "failures": self.failure_list}
1871
1872 if self.app.report_action_keys:
1873 chime_args["action_key"] = action_key
1874
1875 # Don't use app.check_scout; it will deadlock.
1876 self.check_scout(action, **chime_args)
1877
1878 # Remember that we have now chimed...
1879 self.chimed = True
1880
1881 # ...and remember what we sent for that chime.
1882 self.last_chime = self.env_good
1883
1884 def check_environment(self, ir: Optional[IR] = None) -> None:
1885 env_good = True
1886 chime_failures = {}
1887 env_status = SystemStatus()
1888
1889 error_count = 0
1890 tls_count = 0
1891 mapping_count = 0
1892
1893 if not ir:
1894 ir = app.ir
1895
1896 if not ir:
1897 chime_failures["no config loaded"] = True
1898 env_good = False
1899 else:
1900 if not ir.aconf:
1901 chime_failures["completely empty config"] = True
1902 env_good = False
1903 else:
1904 for err_key, err_list in ir.aconf.errors.items():
1905 if err_key == "-global-":
1906 err_key = ""
1907
1908 for err in err_list:
1909 error_count += 1
1910 err_text = err["error"]
1911
1912 self.app.logger.info(f"error {err_key} {err_text}")
1913
1914 if err_text.find("CRD") >= 0:
1915 if err_text.find("core") >= 0:
1916 chime_failures["core CRDs"] = True
1917 env_status.failure("CRDs", "Core CRD type definitions are missing")
1918 else:
1919 chime_failures["other CRDs"] = True
1920 env_status.failure(
1921 "CRDs", "Resolver CRD type definitions are missing"
1922 )
1923
1924 env_good = False
1925 elif err_text.find("TLS") >= 0:
1926 chime_failures["TLS errors"] = True
1927 env_status.failure("TLS", err_text)
1928
1929 env_good = False
1930
1931 for context in ir.tls_contexts:
1932 if context:
1933 tls_count += 1
1934 break
1935
1936 for group in ir.groups.values():
1937 for mapping in group.mappings:
1938 pfx = mapping.get("prefix", None)
1939 name = mapping.get("name", None)
1940
1941 if pfx:
1942 if not pfx.startswith("/ambassador/v0") or not name.startswith("internal_"):
1943 mapping_count += 1
1944
1945 if error_count:
1946 env_status.failure(
1947 "Error check",
1948 f'{error_count} total error{"" if (error_count == 1) else "s"} logged',
1949 )
1950 env_good = False
1951 else:
1952 env_status.OK("Error check", "No errors logged")
1953
1954 if tls_count:
1955 env_status.OK(
1956 "TLS", f'{tls_count} TLSContext{" is" if (tls_count == 1) else "s are"} active'
1957 )
1958 else:
1959 chime_failures["no TLS contexts"] = True
1960 env_status.failure("TLS", "No TLSContexts are active")
1961
1962 env_good = False
1963
1964 if mapping_count:
1965 env_status.OK(
1966 "Mappings",
1967 f'{mapping_count} Mapping{" is" if (mapping_count == 1) else "s are"} active',
1968 )
1969 else:
1970 chime_failures["no Mappings"] = True
1971 env_status.failure("Mappings", "No Mappings are active")
1972 env_good = False
1973
1974 failure_list: List[str] = []
1975
1976 if not env_good:
1977 failure_list = list(sorted(chime_failures.keys()))
1978
1979 self.env_good = env_good
1980 self.env_status = env_status
1981 self.failure_list = failure_list
1982
1983 def check_scout(
1984 self,
1985 what: str,
1986 no_cache: Optional[bool] = False,
1987 ir: Optional[IR] = None,
1988 failures: Optional[List[str]] = None,
1989 action_key: Optional[str] = None,
1990 ) -> None:
1991 now = datetime.datetime.now()
1992 uptime = now - boot_time
1993 hr_uptime = td_format(uptime)
1994
1995 if not ir:
1996 ir = app.ir
1997
1998 self.app.notices.reset()
1999
2000 scout_args = {"uptime": int(uptime.total_seconds()), "hr_uptime": hr_uptime}
2001
2002 if failures:
2003 scout_args["failures"] = failures
2004
2005 if action_key:
2006 scout_args["action_key"] = action_key
2007
2008 if ir:
2009 self.app.logger.debug("check_scout: we have an IR")
2010
2011 if not os.environ.get("AMBASSADOR_DISABLE_FEATURES", None):
2012 self.app.logger.debug("check_scout: including features")
2013 feat = ir.features()
2014
2015 # Include features about the cache and incremental reconfiguration,
2016 # too.
2017
2018 if self.app.cache is not None:
2019 # Fast reconfigure is on. Supply the real info.
2020 feat["frc_enabled"] = True
2021 feat["frc_cache_hits"] = self.app.cache.hits
2022 feat["frc_cache_misses"] = self.app.cache.misses
2023 feat["frc_inv_calls"] = self.app.cache.invalidate_calls
2024 feat["frc_inv_objects"] = self.app.cache.invalidated_objects
2025 else:
2026 # Fast reconfigure is off.
2027 feat["frc_enabled"] = False
2028
2029 # Whether the cache is on or off, we can talk about reconfigurations.
2030 feat["frc_incr_count"] = self.app.reconf_stats.counts["incremental"]
2031 feat["frc_complete_count"] = self.app.reconf_stats.counts["complete"]
2032 feat["frc_check_count"] = self.app.reconf_stats.checks
2033 feat["frc_check_errors"] = self.app.reconf_stats.errors
2034
2035 request_data = app.estatsmgr.get_stats().requests
2036
2037 if request_data:
2038 self.app.logger.debug("check_scout: including requests")
2039
2040 for rkey in request_data.keys():
2041 cur = request_data[rkey]
2042 prev = app.last_request_info.get(rkey, 0)
2043 feat[f"request_{rkey}_count"] = max(cur - prev, 0)
2044
2045 lrt = app.last_request_time or boot_time
2046 since_lrt = now - lrt
2047 elapsed = since_lrt.total_seconds()
2048 hr_elapsed = td_format(since_lrt)
2049
2050 app.last_request_time = now
2051 app.last_request_info = request_data
2052
2053 feat["request_elapsed"] = elapsed
2054 feat["request_hr_elapsed"] = hr_elapsed
2055
2056 scout_args["features"] = feat
2057
2058 scout_result = self.app.scout.report(
2059 mode="diagd", action=what, no_cache=no_cache, **scout_args
2060 )
2061 scout_notices = scout_result.pop("notices", [])
2062
2063 global_loglevel = self.app.logger.getEffectiveLevel()
2064
2065 self.app.logger.debug(f"Scout section: global loglevel {global_loglevel}")
2066
2067 for notice in scout_notices:
2068 notice_level_name = notice.get("level") or "INFO"
2069 notice_level = logging.getLevelName(notice_level_name)
2070
2071 if notice_level >= global_loglevel:
2072 self.app.logger.debug(f"Scout section: include {notice}")
2073 self.app.notices.post(notice)
2074 else:
2075 self.app.logger.debug(f"Scout section: skip {notice}")
2076
2077 self.app.logger.debug("Scout reports %s" % dump_json(scout_result))
2078 self.app.logger.debug("Scout notices: %s" % dump_json(scout_notices))
2079 self.app.logger.debug("App notices after scout: %s" % dump_json(app.notices.notices))
2080
2081 def validate_envoy_config(self, ir: IR, config, retries) -> bool:
2082 if self.app.no_envoy:
2083 self.app.logger.debug("Skipping validation")
2084 return True
2085
2086 # We want to keep the original config untouched
2087 validation_config = copy.deepcopy(config)
2088
2089 # Envoy fails to validate with @type field in envoy config, so removing that
2090 validation_config.pop("@type")
2091
2092 if os.environ.get("AMBASSADOR_DEBUG_CLUSTER_CONFIG", "false").lower() == "true":
2093 vconf_clusters = validation_config["static_resources"]["clusters"]
2094
2095 if len(vconf_clusters) > 10:
2096 vconf_clusters.append(copy.deepcopy(vconf_clusters[10]))
2097
2098 # Check for cluster-name weirdness.
2099 _v2_clusters = {}
2100 _problems = []
2101
2102 for name in sorted(ir.clusters.keys()):
2103 if AmbassadorEventWatcher.reCompressed.search(name):
2104 _problems.append(f"IR pre-compressed cluster {name}")
2105
2106 for cluster in validation_config["static_resources"]["clusters"]:
2107 name = cluster["name"]
2108
2109 if name in _v2_clusters:
2110 _problems.append(f"V2 dup cluster {name}")
2111 _v2_clusters[name] = True
2112
2113 if _problems:
2114 self.logger.error("ENVOY CONFIG PROBLEMS:\n%s", "\n".join(_problems))
2115 stamp = datetime.datetime.now().isoformat()
2116
2117 bad_snapshot = open(
2118 os.path.join(app.snapshot_path, "snapshot-tmp.yaml"), "r"
2119 ).read()
2120
2121 cache_dict: Dict[str, Any] = {}
2122 cache_links: Dict[str, Any] = {}
2123
2124 if self.app.cache:
2125 for k, c in self.app.cache.cache.items():
2126 v: Any = c[0]
2127
2128 if getattr(v, "as_dict", None):
2129 v = v.as_dict()
2130
2131 cache_dict[k] = v
2132
2133 cache_links = {k: list(v) for k, v in self.app.cache.links.items()}
2134
2135 bad_dict = {
2136 "ir": ir.as_dict(),
2137 "v2": config,
2138 "validation": validation_config,
2139 "problems": _problems,
2140 "snapshot": bad_snapshot,
2141 "cache": cache_dict,
2142 "links": cache_links,
2143 }
2144
2145 bad_dict_str = dump_json(bad_dict, pretty=True)
2146 with open(os.path.join(app.snapshot_path, f"problems-{stamp}.json"), "w") as output:
2147 output.write(bad_dict_str)
2148
2149 config_json = dump_json(validation_config, pretty=True)
2150
2151 econf_validation_path = os.path.join(app.snapshot_path, "econf-tmp.json")
2152
2153 with open(econf_validation_path, "w") as output:
2154 output.write(config_json)
2155
2156 command = [
2157 "envoy",
2158 "--service-node",
2159 "test-id",
2160 "--service-cluster",
2161 ir.ambassador_nodename,
2162 "--config-path",
2163 econf_validation_path,
2164 "--mode",
2165 "validate",
2166 ]
2167
2168 v_exit = 0
2169 v_encoded = "".encode("utf-8")
2170
2171 # Try to validate the Envoy config. Short circuit and fall through
2172 # immediately on concrete success or failure, and retry (up to the
2173 # limit) on timeout.
2174 #
2175 # The default timeout is 5s, but this can be overridden in the Ambassador
2176 # module.
2177
2178 amod = ir.ambassador_module
2179 timeout = amod.envoy_validation_timeout if amod else IRAmbassador.default_validation_timeout
2180
2181 # If the timeout is zero, don't do the validation.
2182 if timeout == 0:
2183 self.logger.debug("not validating Envoy configuration since timeout is 0")
2184 return True
2185
2186 self.logger.debug(f"validating Envoy configuration with timeout {timeout}")
2187
2188 for retry in range(retries):
2189 try:
2190 v_encoded = subprocess.check_output(
2191 command, stderr=subprocess.STDOUT, timeout=timeout
2192 )
2193 v_exit = 0
2194 break
2195 except subprocess.CalledProcessError as e:
2196 v_exit = e.returncode
2197 v_encoded = e.output
2198 break
2199 except subprocess.TimeoutExpired as e:
2200 v_exit = 1
2201 v_encoded = e.output or "".encode("utf-8")
2202
2203 self.logger.warn(
2204 "envoy configuration validation timed out after {} seconds{}\n{}".format(
2205 timeout,
2206 ", retrying..." if retry < retries - 1 else "",
2207 v_encoded.decode("utf-8"),
2208 )
2209 )
2210
2211 # Don't break here; continue on to the next iteration of the loop.
2212
2213 if v_exit == 0:
2214 self.logger.debug(
2215 "successfully validated the resulting envoy configuration, continuing..."
2216 )
2217 return True
2218
2219 v_str = typecast(str, v_encoded)
2220
2221 try:
2222 v_str = v_encoded.decode("utf-8")
2223 except:
2224 pass
2225
2226 self.logger.error(
2227 "{}\ncould not validate the envoy configuration above after {} retries, failed with error \n{}\n(exit code {})\nAborting update...".format(
2228 config_json, retries, v_str, v_exit
2229 )
2230 )
2231 return False
2232
2233
2234class StandaloneApplication(gunicorn.app.base.BaseApplication):
2235 def __init__(self, app, options=None):
2236 self.options = options or {}
2237 self.application = app
2238 super(StandaloneApplication, self).__init__()
2239
2240 # Boot chime. This is basically the earliest point at which we can consider an Ambassador
2241 # to be "running".
2242 scout_result = self.application.scout.report(mode="boot", action="boot1", no_cache=True)
2243 self.application.logger.debug(f"BOOT: Scout result {dump_json(scout_result)}")
2244 self.application.logger.info(f"Ambassador {__version__} booted")
2245
2246 def load_config(self):
2247 config = dict(
2248 [
2249 (key, value)
2250 for key, value in self.options.items()
2251 if key in self.cfg.settings and value is not None
2252 ]
2253 )
2254
2255 for key, value in config.items():
2256 self.cfg.set(key.lower(), value)
2257
2258 def load(self):
2259 # This is a little weird, but whatever.
2260 self.application.watcher = AmbassadorEventWatcher(self.application)
2261 self.application.watcher.start()
2262
2263 if self.application.config_path:
2264 self.application.watcher.post("CONFIG_FS", self.application.config_path)
2265
2266 return self.application
2267
2268
2269@click.command()
2270@click.argument("snapshot-path", type=click.Path(), required=False)
2271@click.argument("bootstrap-path", type=click.Path(), required=False)
2272@click.argument("ads-path", type=click.Path(), required=False)
2273@click.option(
2274 "--config-path",
2275 type=click.Path(),
2276 help="Optional configuration path to scan for Ambassador YAML files",
2277)
2278@click.option(
2279 "--k8s",
2280 is_flag=True,
2281 help="If True, assume config_path contains Kubernetes resources (only relevant with config_path)",
2282)
2283@click.option(
2284 "--ambex-pid",
2285 type=int,
2286 default=0,
2287 help="Optional PID to signal with HUP after updating Envoy configuration",
2288 show_default=True,
2289)
2290@click.option("--kick", type=str, help="Optional command to run after updating Envoy configuration")
2291@click.option(
2292 "--banner-endpoint",
2293 type=str,
2294 default="http://127.0.0.1:8500/banner",
2295 help="Optional endpoint of extra banner to include",
2296 show_default=True,
2297)
2298@click.option(
2299 "--metrics-endpoint",
2300 type=str,
2301 default="http://127.0.0.1:8500/metrics",
2302 help="Optional endpoint of extra prometheus metrics to include",
2303 show_default=True,
2304)
2305@click.option("--no-checks", is_flag=True, help="If True, don't do Envoy-cluster health checking")
2306@click.option("--no-envoy", is_flag=True, help="If True, don't interact with Envoy at all")
2307@click.option("--reload", is_flag=True, help="If True, run Flask in debug mode for live reloading")
2308@click.option("--debug", is_flag=True, help="If True, do debug logging")
2309@click.option(
2310 "--dev-magic",
2311 is_flag=True,
2312 help="If True, override a bunch of things for Datawire dev-loop stuff",
2313)
2314@click.option("--verbose", is_flag=True, help="If True, do really verbose debug logging")
2315@click.option(
2316 "--workers", type=int, help="Number of workers; default is based on the number of CPUs present"
2317)
2318@click.option("--host", type=str, help="Interface on which to listen")
2319@click.option("--port", type=int, default=-1, help="Port on which to listen", show_default=True)
2320@click.option("--notices", type=click.Path(), help="Optional file to read for local notices")
2321@click.option(
2322 "--validation-retries",
2323 type=int,
2324 default=5,
2325 help="Number of times to retry Envoy configuration validation after a timeout",
2326 show_default=True,
2327)
2328@click.option(
2329 "--allow-fs-commands",
2330 is_flag=True,
2331 help="If true, allow CONFIG_FS to support debug/testing commands",
2332)
2333@click.option(
2334 "--local-scout",
2335 is_flag=True,
2336 help="Don't talk to remote Scout at all; keep everything purely local",
2337)
2338@click.option("--report-action-keys", is_flag=True, help="Report action keys when chiming")
2339def main(
2340 snapshot_path=None,
2341 bootstrap_path=None,
2342 ads_path=None,
2343 *,
2344 dev_magic=False,
2345 config_path=None,
2346 ambex_pid=0,
2347 kick=None,
2348 banner_endpoint="http://127.0.0.1:8500/banner",
2349 metrics_endpoint="http://127.0.0.1:8500/metrics",
2350 k8s=False,
2351 no_checks=False,
2352 no_envoy=False,
2353 reload=False,
2354 debug=False,
2355 verbose=False,
2356 workers=None,
2357 port=-1,
2358 host="",
2359 notices=None,
2360 validation_retries=5,
2361 allow_fs_commands=False,
2362 local_scout=False,
2363 report_action_keys=False,
2364):
2365 """
2366 Run the diagnostic daemon.
2367
2368 Arguments:
2369
2370 SNAPSHOT_PATH
2371 Path to directory in which to save configuration snapshots and dynamic secrets
2372
2373 BOOTSTRAP_PATH
2374 Path to which to write bootstrap Envoy configuration
2375
2376 ADS_PATH
2377 Path to which to write ADS Envoy configuration
2378 """
2379
2380 enable_fast_reconfigure = parse_bool(os.environ.get("AMBASSADOR_FAST_RECONFIGURE", "true"))
2381
2382 if port < 0:
2383 port = Constants.DIAG_PORT if not enable_fast_reconfigure else Constants.DIAG_PORT_ALT
2384 # port = Constants.DIAG_PORT
2385
2386 if not host:
2387 host = "0.0.0.0" if not enable_fast_reconfigure else "127.0.0.1"
2388
2389 if dev_magic:
2390 # Override the world.
2391 os.environ["SCOUT_HOST"] = "127.0.0.1:9999"
2392 os.environ["SCOUT_HTTPS"] = "no"
2393
2394 no_checks = True
2395 no_envoy = True
2396
2397 os.makedirs("/tmp/snapshots", mode=0o755, exist_ok=True)
2398
2399 snapshot_path = "/tmp/snapshots"
2400 bootstrap_path = "/tmp/boot.json"
2401 ads_path = "/tmp/ads.json"
2402
2403 port = 9998
2404
2405 allow_fs_commands = True
2406 local_scout = True
2407 report_action_keys = True
2408
2409 if no_envoy:
2410 no_checks = True
2411
2412 # Create the application itself.
2413 app.setup(
2414 snapshot_path,
2415 bootstrap_path,
2416 ads_path,
2417 config_path,
2418 ambex_pid,
2419 kick,
2420 banner_endpoint,
2421 metrics_endpoint,
2422 k8s,
2423 not no_checks,
2424 no_envoy,
2425 reload,
2426 debug,
2427 verbose,
2428 notices,
2429 validation_retries,
2430 allow_fs_commands,
2431 local_scout,
2432 report_action_keys,
2433 enable_fast_reconfigure,
2434 )
2435
2436 if not workers:
2437 workers = number_of_workers()
2438
2439 gunicorn_config = {
2440 "bind": "%s:%s" % (host, port),
2441 # 'workers': 1,
2442 "threads": workers,
2443 }
2444
2445 app.logger.info(
2446 "thread count %d, listening on %s" % (gunicorn_config["threads"], gunicorn_config["bind"])
2447 )
2448
2449 StandaloneApplication(app, gunicorn_config).run()
2450
2451
2452if __name__ == "__main__":
2453 main()
View as plain text