1#!python
2
3# Copyright 2018 Datawire. All rights reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License
16import concurrent.futures
17import copy
18import datetime
19import difflib
20import functools
21import json
22import logging
23import multiprocessing
24import os
25import queue
26import re
27import signal
28import subprocess
29import sys
30import threading
31import time
32import traceback
33import uuid
34from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union
35from typing import cast as typecast
36
37import click
38import gunicorn.app.base
39import jsonpatch
40import requests
41from expiringdict import ExpiringDict
42from flask import Flask, Response
43from flask import json as flask_json
44from flask import jsonify, render_template, request, send_from_directory
45from pkg_resources import Requirement, resource_filename
46from prometheus_client import CollectorRegistry, Gauge, Info, ProcessCollector, generate_latest
47from pythonjsonlogger import jsonlogger
48
49from ambassador import IR, Cache, Config, Diagnostics, EnvoyConfig, Scout, Version
50from ambassador.constants import Constants
51from ambassador.diagnostics import EnvoyStats, EnvoyStatsMgr
52from ambassador.fetch import ResourceFetcher
53from ambassador.ir.irambassador import IRAmbassador
54from ambassador.reconfig_stats import ReconfigStats
55from ambassador.utils import (
56 FSSecretHandler,
57 PeriodicTrigger,
58 SecretHandler,
59 SystemInfo,
60 Timer,
61 dump_json,
62 load_url_contents,
63 parse_bool,
64 parse_json,
65)
66
67if TYPE_CHECKING:
68 from ambassador.ir.irtlscontext import IRTLSContext # pragma: no cover
69
70__version__ = Version
71
72boot_time = datetime.datetime.now()
73
74# allows 10 concurrent users, with a request timeout of 60 seconds
75tvars_cache = ExpiringDict(max_len=10, max_age_seconds=60)
76
77logHandler = None
78if parse_bool(os.environ.get("AMBASSADOR_JSON_LOGGING", "false")):
79 jsonFormatter = jsonlogger.JsonFormatter(
80 "%%(asctime)s %%(filename)s %%(lineno)d %%(process)d (threadName)s %%(levelname)s %%(message)s"
81 )
82 logHandler = logging.StreamHandler()
83 logHandler.setFormatter(jsonFormatter)
84
85 # Set the root logger to INFO level and tell it to use the new log handler.
86 logger = logging.getLogger()
87 logger.setLevel(logging.INFO)
88 logger.addHandler(logHandler)
89
90 # Update all of the other loggers to also use the new log handler.
91 loggingManager = getattr(logging.root, "manager", None)
92 if loggingManager is not None:
93 for name in loggingManager.loggerDict:
94 logging.getLogger(name).addHandler(logHandler)
95 else:
96 print("Could not find a logging manager. Some logging may not be properly JSON formatted!")
97else:
98 # Default log level
99 level = logging.INFO
100
101 # Check for env var log level
102 if level_name := os.getenv("AES_LOG_LEVEL"):
103 level_number = logging.getLevelName(level_name.upper())
104
105 if isinstance(level_number, int):
106 level = level_number
107
108 # Set defauts for all loggers
109 logging.basicConfig(
110 level=level,
111 format="%%(asctime)s diagd %s [P%%(process)dT%%(threadName)s] %%(levelname)s: %%(message)s"
112 % __version__,
113 datefmt="%Y-%m-%d %H:%M:%S",
114 )
115
116# Shut up Werkzeug's standard request logs -- they're just too noisy.
117logging.getLogger("werkzeug").setLevel(logging.CRITICAL)
118
119# Likewise make requests a bit quieter.
120logging.getLogger("urllib3").setLevel(logging.WARNING)
121logging.getLogger("requests").setLevel(logging.WARNING)
122
123ambassador_targets = {
124 "mapping": "https://www.getambassador.io/reference/configuration#mappings",
125 "module": "https://www.getambassador.io/reference/configuration#modules",
126}
127
128# envoy_targets = {
129# 'route': 'https://envoyproxy.github.io/envoy/configuration/http_conn_man/route_config/route.html',
130# 'cluster': 'https://envoyproxy.github.io/envoy/configuration/cluster_manager/cluster.html',
131# }
132
133
134def number_of_workers():
135 return (multiprocessing.cpu_count() * 2) + 1
136
137
138class DiagApp(Flask):
139 cache: Optional[Cache]
140 ambex_pid: int
141 kick: Optional[str]
142 estatsmgr: EnvoyStatsMgr
143 config_path: Optional[str]
144 snapshot_path: str
145 bootstrap_path: str
146 ads_path: str
147 clustermap_path: str
148 health_checks: bool
149 no_envoy: bool
150 debugging: bool
151 allow_fs_commands: bool
152 report_action_keys: bool
153 verbose: bool
154 notice_path: str
155 logger: logging.Logger
156 aconf: Config
157 ir: Optional[IR]
158 econf: Optional[EnvoyConfig]
159 # self.diag is actually a property
160 _diag: Optional[Diagnostics]
161 notices: "Notices"
162 scout: Scout
163 watcher: "AmbassadorEventWatcher"
164 stats_updater: Optional[PeriodicTrigger]
165 scout_checker: Optional[PeriodicTrigger]
166 last_request_info: Dict[str, int]
167 last_request_time: Optional[datetime.datetime]
168 latest_snapshot: str
169 banner_endpoint: Optional[str]
170 metrics_endpoint: Optional[str]
171
172 # Reconfiguration stats
173 reconf_stats: ReconfigStats
174
175 # Custom metrics registry to weed-out default metrics collectors because the
176 # default collectors can't be prefixed/namespaced with ambassador_.
177 # Using the default metrics collectors would lead to name clashes between the Python and Go instrumentations.
178 metrics_registry: CollectorRegistry
179
180 config_lock: threading.Lock
181 diag_lock: threading.Lock
182
183 def setup(
184 self,
185 snapshot_path: str,
186 bootstrap_path: str,
187 ads_path: str,
188 config_path: Optional[str],
189 ambex_pid: int,
190 kick: Optional[str],
191 banner_endpoint: Optional[str],
192 metrics_endpoint: Optional[str],
193 k8s=False,
194 do_checks=True,
195 no_envoy=False,
196 reload=False,
197 debug=False,
198 verbose=False,
199 notices=None,
200 validation_retries=5,
201 allow_fs_commands=False,
202 local_scout=False,
203 report_action_keys=False,
204 enable_fast_reconfigure=False,
205 clustermap_path=None,
206 ):
207 self.health_checks = do_checks
208 self.no_envoy = no_envoy
209 self.debugging = reload
210 self.verbose = verbose
211 self.notice_path = notices
212 self.notices = Notices(self.notice_path)
213 self.notices.reset()
214 self.k8s = k8s
215 self.validation_retries = validation_retries
216 self.allow_fs_commands = allow_fs_commands
217 self.local_scout = local_scout
218 self.report_action_keys = report_action_keys
219 self.banner_endpoint = banner_endpoint
220 self.metrics_endpoint = metrics_endpoint
221 self.metrics_registry = CollectorRegistry(auto_describe=True)
222 self.enable_fast_reconfigure = enable_fast_reconfigure
223
224 # Init logger, inherits settings from default
225 self.logger = logging.getLogger("ambassador.diagd")
226
227 # Initialize the Envoy stats manager...
228 self.estatsmgr = EnvoyStatsMgr(self.logger)
229
230 # ...and the incremental-reconfigure stats.
231 self.reconf_stats = ReconfigStats(self.logger)
232
233 # This will raise an exception and crash if you pass it a string. That's intentional.
234 self.ambex_pid = int(ambex_pid)
235 self.kick = kick
236
237 # Initialize the cache if we're allowed to.
238 if self.enable_fast_reconfigure:
239 self.logger.info("AMBASSADOR_FAST_RECONFIGURE enabled, initializing cache")
240 self.cache = Cache(self.logger)
241 else:
242 self.logger.info("AMBASSADOR_FAST_RECONFIGURE disabled, not initializing cache")
243 self.cache = None
244
245 # Use Timers to keep some stats on reconfigurations
246 self.config_timer = Timer("reconfiguration", self.metrics_registry)
247 self.fetcher_timer = Timer("Fetcher", self.metrics_registry)
248 self.aconf_timer = Timer("AConf", self.metrics_registry)
249 self.ir_timer = Timer("IR", self.metrics_registry)
250 self.econf_timer = Timer("EConf", self.metrics_registry)
251 self.diag_timer = Timer("Diagnostics", self.metrics_registry)
252
253 # Use gauges to keep some metrics on active config
254 self.diag_errors = Gauge(
255 f"diagnostics_errors",
256 f"Number of configuration errors",
257 namespace="ambassador",
258 registry=self.metrics_registry,
259 )
260 self.diag_notices = Gauge(
261 f"diagnostics_notices",
262 f"Number of configuration notices",
263 namespace="ambassador",
264 registry=self.metrics_registry,
265 )
266 self.diag_log_level = Gauge(
267 f"log_level",
268 f"Debug log level enabled or not",
269 ["level"],
270 namespace="ambassador",
271 registry=self.metrics_registry,
272 )
273
274 if debug:
275 self.logger.setLevel(logging.DEBUG)
276 self.diag_log_level.labels("debug").set(1)
277 logging.getLogger("ambassador").setLevel(logging.DEBUG)
278 else:
279 self.diag_log_level.labels("debug").set(0)
280
281 # Assume that we will NOT update Mapping status.
282 ksclass: Type[KubeStatus] = KubeStatusNoMappings
283
284 if os.environ.get("AMBASSADOR_UPDATE_MAPPING_STATUS", "false").lower() == "true":
285 self.logger.info("WILL update Mapping status")
286 ksclass = KubeStatus
287 else:
288 self.logger.info("WILL NOT update Mapping status")
289
290 self.kubestatus = ksclass(self)
291
292 self.config_path = config_path
293 self.bootstrap_path = bootstrap_path
294 self.ads_path = ads_path
295 self.snapshot_path = snapshot_path
296 self.clustermap_path = clustermap_path or os.path.join(
297 os.path.dirname(self.bootstrap_path), "clustermap.json"
298 )
299
300 # You must hold config_lock when updating config elements (including diag!).
301 self.config_lock = threading.Lock()
302
303 # When generating new diagnostics, there's a dance around config_lock and
304 # diag_lock -- see the diag() property.
305 self.diag_lock = threading.Lock()
306
307 # Why are we doing this? Aren't we sure we're singlethreaded here?
308 # Well, yes. But self.diag is actually a property, and it will raise an
309 # assertion failure if we're not holding self.config_lock... and once
310 # the lock is in play at all, we're gonna time it, in case my belief
311 # that grabbing the lock here is always effectively free turns out to
312 # be wrong.
313
314 with self.config_lock:
315 self.ir = None # don't update unless you hold config_lock
316 self.econf = None # don't update unless you hold config_lock
317 self.diag = None # don't update unless you hold config_lock
318
319 self.stats_updater = None
320 self.scout_checker = None
321
322 self.last_request_info = {}
323 self.last_request_time = None
324
325 # self.scout = Scout(update_frequency=datetime.timedelta(seconds=10))
326 self.scout = Scout(local_only=self.local_scout)
327
328 ProcessCollector(namespace="ambassador", registry=self.metrics_registry)
329 metrics_info = Info(
330 name="diagnostics",
331 namespace="ambassador",
332 documentation="Ambassador diagnostic info",
333 registry=self.metrics_registry,
334 )
335 metrics_info.info(
336 {
337 "version": __version__,
338 "ambassador_id": Config.ambassador_id,
339 "cluster_id": os.environ.get(
340 "AMBASSADOR_CLUSTER_ID",
341 os.environ.get("AMBASSADOR_SCOUT_ID", "00000000-0000-0000-0000-000000000000"),
342 ),
343 "single_namespace": str(Config.single_namespace),
344 }
345 )
346
347 @property
348 def diag(self) -> Optional[Diagnostics]:
349 """
350 It turns out to be expensive to generate the Diagnostics class, so
351 app.diag is a property that does it on demand, handling Timers and
352 the config lock for you.
353
354 You MUST NOT already hold the config_lock or the diag_lock when
355 trying to read app.diag.
356
357 You MUST already have loaded an IR.
358 """
359
360 # The config_lock is meant to make sure that we don't ever update
361 # self.diag in two places at once, so grab that first.
362 with self.config_lock:
363 # If we've already generated diagnostics...
364 if app._diag:
365 # ...then we're good to go.
366 return app._diag
367
368 # If here, we have _not_ generated diagnostics, and we've dropped the
369 # config lock so as not to block anyone else. Next up: grab the diag
370 # lock, because we'd rather not have two diag generations happening at
371 # once.
372 with self.diag_lock:
373 # Did someone else sneak in between releasing the config lock and
374 # grabbing the diag lock?
375 if app._diag:
376 # Yup. Use their work.
377 return app._diag
378
379 # OK, go generate diagnostics.
380 _diag = self._generate_diagnostics()
381
382 # If that didn't work, no point in messing with the config lock.
383 if not _diag:
384 return None
385
386 # Once here, we need to - once again - grab the config lock to update
387 # app._diag. This is safe because this is the only place we ever mess
388 # with the diag lock, so nowhere else will try to grab the diag lock
389 # while holding the config lock.
390 with app.config_lock:
391 app._diag = _diag
392
393 # Finally, we can return app._diag to our caller.
394 return app._diag
395
396 @diag.setter
397 def diag(self, diag: Optional[Diagnostics]) -> None:
398 """
399 It turns out to be expensive to generate the Diagnostics class, so
400 app.diag is a property that does it on demand, handling Timers and
401 the config lock for you.
402
403 You MUST already hold the config_lock when trying to update app.diag.
404 You MUST NOT hold the diag_lock.
405 """
406 self._diag = diag
407
408 def _generate_diagnostics(self) -> Optional[Diagnostics]:
409 """
410 Do the heavy lifting of generating Diagnostics for our current configuration.
411 Really, only the diag() property should be calling this method, but if you
412 are convinced that you need to call it from elsewhere:
413
414 1. You're almost certainly wrong about needing to call it from elsewhere.
415 2. You MUST hold the diag_lock when calling this method.
416 3. You MUST NOT hold the config_lock when calling this method.
417 4. No, really, you're wrong. Don't call this method from anywhere but the
418 diag() property.
419 """
420
421 # Make sure we have an IR and econf to work with.
422 if not app.ir or not app.econf:
423 # Nope, bail.
424 return None
425
426 # OK, go ahead and generate diagnostics. Use the diag_timer to time
427 # this.
428 with self.diag_timer:
429 _diag = Diagnostics(app.ir, app.econf)
430
431 # Update some metrics data points given the new generated Diagnostics
432 diag_dict = _diag.as_dict()
433 self.diag_errors.set(len(diag_dict.get("errors", [])))
434 self.diag_notices.set(len(diag_dict.get("notices", [])))
435
436 # Note that we've updated diagnostics, since that might trigger a
437 # timer log.
438 self.reconf_stats.mark("diag")
439
440 return _diag
441
442 def check_scout(self, what: str) -> None:
443 self.watcher.post("SCOUT", (what, self.ir))
444
445 def post_timer_event(self) -> None:
446 # Post an event to do a timer check.
447 self.watcher.post("TIMER", None)
448
449 def check_timers(self) -> None:
450 # Actually do the timer check.
451
452 if self.reconf_stats.needs_timers():
453 # OK! Log the timers...
454
455 for t in [
456 self.config_timer,
457 self.fetcher_timer,
458 self.aconf_timer,
459 self.ir_timer,
460 self.econf_timer,
461 self.diag_timer,
462 ]:
463 if t:
464 self.logger.info(t.summary())
465
466 # ...and the cache statistics, if we can.
467 if self.cache:
468 self.cache.dump_stats()
469
470 # Always dump the reconfiguration stats...
471 self.reconf_stats.dump()
472
473 # ...and mark that the timers have been logged.
474 self.reconf_stats.mark_timers_logged()
475
476 # In this case we need to check to see if it's time to do a configuration
477 # check, too.
478 if self.reconf_stats.needs_check():
479 result = False
480
481 try:
482 result = self.check_cache()
483 except Exception as e:
484 tb = "\n".join(traceback.format_exception(*sys.exc_info()))
485 self.logger.error("CACHE: CHECK FAILED: %s\n%s" % (e, tb))
486
487 # Mark that the check has happened.
488 self.reconf_stats.mark_checked(result)
489
490 @staticmethod
491 def json_diff(what: str, j1: str, j2: str) -> str:
492 output = ""
493
494 l1 = j1.split("\n")
495 l2 = j2.split("\n")
496
497 n1 = f"{what} incremental"
498 n2 = f"{what} nonincremental"
499
500 output += "\n--------\n"
501
502 for line in difflib.context_diff(l1, l2, fromfile=n1, tofile=n2):
503 line = line.rstrip()
504 output += line
505 output += "\n"
506
507 return output
508
509 def check_cache(self) -> bool:
510 # We're going to build a shiny new IR and econf from our existing aconf, and make
511 # sure everything matches. We will _not_ use the existing cache for this.
512 #
513 # For this, make sure we have an IR already...
514 assert self.aconf
515 assert self.ir
516 assert self.econf
517
518 # Compute this IR/econf with a new empty cache. It saves a lot of trouble with
519 # having to delete cache keys from the JSON.
520
521 self.logger.debug("CACHE: starting check")
522 cache = Cache(self.logger)
523 scc = SecretHandler(app.logger, "check_cache", app.snapshot_path, "check")
524 ir = IR(self.aconf, secret_handler=scc, cache=cache)
525 econf = EnvoyConfig.generate(ir, Config.envoy_api_version, cache=cache)
526
527 # This is testing code.
528 # name = list(ir.clusters.keys())[0]
529 # del(ir.clusters[name])
530
531 i1 = self.ir.as_json()
532 i2 = ir.as_json()
533
534 e1 = self.econf.as_json()
535 e2 = econf.as_json()
536
537 result = True
538 errors = ""
539
540 if i1 != i2:
541 result = False
542 self.logger.error("CACHE: IR MISMATCH")
543 errors += "IR diffs:\n"
544 errors += self.json_diff("IR", i1, i2)
545
546 if e1 != e2:
547 result = False
548 self.logger.error("CACHE: ENVOY CONFIG MISMATCH")
549 errors += "econf diffs:\n"
550 errors += self.json_diff("econf", i1, i2)
551
552 if not result:
553 err_path = os.path.join(self.snapshot_path, "diff-tmp.txt")
554
555 open(err_path, "w").write(errors)
556
557 snapcount = int(os.environ.get("AMBASSADOR_SNAPSHOT_COUNT", "4"))
558 snaplist: List[Tuple[str, str]] = []
559
560 if snapcount > 0:
561 # If snapcount is 4, this range statement becomes range(-4, -1)
562 # which gives [ -4, -3, -2 ], which the list comprehension turns
563 # into [ ( "-3", "-4" ), ( "-2", "-3" ), ( "-1", "-2" ) ]...
564 # which is the list of suffixes to rename to rotate the snapshots.
565
566 snaplist += [(str(x + 1), str(x)) for x in range(-1 * snapcount, -1)]
567
568 # After dealing with that, we need to rotate the current file into -1.
569 snaplist.append(("", "-1"))
570
571 # Whether or not we do any rotation, we need to cycle in the '-tmp' file.
572 snaplist.append(("-tmp", ""))
573
574 for from_suffix, to_suffix in snaplist:
575 from_path = os.path.join(app.snapshot_path, "diff{}.txt".format(from_suffix))
576 to_path = os.path.join(app.snapshot_path, "diff{}.txt".format(to_suffix))
577
578 try:
579 self.logger.debug("rotate: %s -> %s" % (from_path, to_path))
580 os.rename(from_path, to_path)
581 except IOError as e:
582 self.logger.debug("skip %s -> %s: %s" % (from_path, to_path, e))
583 except Exception as e:
584 self.logger.debug("could not rename %s -> %s: %s" % (from_path, to_path, e))
585
586 self.logger.info("CACHE: check %s" % ("succeeded" if result else "failed"))
587
588 return result
589
590
591# get the "templates" directory, or raise "FileNotFoundError" if not found
592def get_templates_dir():
593 res_dir = None
594 try:
595 # this will fail when not in a distribution
596 res_dir = resource_filename(Requirement.parse("ambassador"), "templates")
597 except:
598 pass
599
600 maybe_dirs = [res_dir, os.path.join(os.path.dirname(__file__), "..", "templates")]
601 for d in maybe_dirs:
602 if d and os.path.isdir(d):
603 return d
604 raise FileNotFoundError
605
606
607# Get the Flask app defined early. Setup happens later.
608app = DiagApp(__name__, template_folder=get_templates_dir())
609
610
611######## DECORATORS
612
613
614def standard_handler(f):
615 func_name = getattr(f, "__name__", "<anonymous>")
616
617 @functools.wraps(f)
618 def wrapper(*args, **kwds):
619 reqid = str(uuid.uuid4()).upper()
620 prefix = '%s: %s "%s %s"' % (reqid, request.remote_addr, request.method, request.path)
621
622 app.logger.debug("%s START" % prefix)
623
624 start = datetime.datetime.now()
625
626 app.logger.debug("%s handler %s" % (prefix, func_name))
627
628 # Getting elements in the `tvars_cache` will make sure eviction happens on `max_age_seconds` TTL
629 # for removed patch_client rather than waiting to fill `max_len`.
630 # Looping over a copied list of keys, to prevent mutating tvars_cache while iterating.
631 for k in list(tvars_cache.keys()):
632 tvars_cache.get(k)
633
634 # Default to the exception case
635 result_to_log = "server error"
636 status_to_log = 500
637 result_log_level = logging.ERROR
638 result = Response(result_to_log, status_to_log)
639
640 try:
641 result = f(*args, reqid=reqid, **kwds)
642 if not isinstance(result, Response):
643 result = Response(f"Invalid handler result {result}", status_to_log)
644
645 status_to_log = result.status_code
646
647 if (status_to_log // 100) == 2:
648 result_log_level = logging.INFO
649 result_to_log = "success"
650 else:
651 result_log_level = logging.ERROR
652 result_to_log = "failure"
653 except Exception as e:
654 app.logger.exception(e)
655
656 end = datetime.datetime.now()
657 ms = int(((end - start).total_seconds() * 1000) + 0.5)
658
659 app.logger.log(
660 result_log_level, "%s %dms %d %s" % (prefix, ms, status_to_log, result_to_log)
661 )
662
663 return result
664
665 return wrapper
666
667
668def internal_handler(f):
669 """
670 Reject requests where the remote address is not localhost. See the docstring
671 for _is_local_request() for important caveats!
672 """
673 func_name = getattr(f, "__name__", "<anonymous>")
674
675 @functools.wraps(f)
676 def wrapper(*args, **kwds):
677 if not _is_local_request():
678 return "Forbidden\n", 403
679 return f(*args, **kwds)
680
681 return wrapper
682
683
684######## UTILITIES
685
686
687def _is_local_request() -> bool:
688 """
689 Determine if this request originated with localhost.
690
691 We rely on healthcheck_server.go setting the X-Ambassador-Diag-IP header for us
692 (and we rely on it overwriting anything that's already there!).
693
694 It might be possible to consider the environment variables SERVER_NAME and
695 SERVER_PORT instead, as those are allegedly required by WSGI... but attempting
696 to do so in Flask/GUnicorn yielded a worse implementation that was still not
697 portable.
698
699 """
700
701 remote_addr: Optional[str] = ""
702
703 remote_addr = request.headers.get("X-Ambassador-Diag-IP")
704
705 return remote_addr == "127.0.0.1"
706
707
708def _allow_diag_ui() -> bool:
709 """
710 Helper function to check if diag ui traffic is allowed or not
711 based on the different flags from the config:
712 * diagnostics.enabled: Enable to diag UI by adding mappings
713 * diagnostics.allow_non_local: Allow non local traffic
714 even when diagnotics UI is disabled.
715 Mappings are not added for the diag UI
716 but the diagnotics UI is still exposed for
717 the pod IP in the admin port.
718 * local traffic or not: When diagnotics disagled and allow_non_local is false,
719 allow traffic only from localhost clients
720 """
721 enabled = False
722 allow_non_local = False
723 ir = app.ir
724 if ir:
725 enabled = ir.ambassador_module.diagnostics.get("enabled", False)
726 allow_non_local = ir.ambassador_module.diagnostics.get("allow_non_local", False)
727 if not enabled and not _is_local_request() and not allow_non_local:
728 return False
729 return True
730
731
732class Notices:
733 def __init__(self, local_config_path: str) -> None:
734 self.local_path = local_config_path
735 self.notices: List[Dict[str, str]] = []
736
737 def reset(self):
738 local_notices: List[Dict[str, str]] = []
739 local_data = ""
740
741 try:
742 local_stream = open(self.local_path, "r")
743 local_data = local_stream.read()
744 local_notices = parse_json(local_data)
745 except OSError:
746 pass
747 except:
748 local_notices.append(
749 {"level": "ERROR", "message": "bad local notices: %s" % local_data}
750 )
751
752 self.notices = local_notices
753 # app.logger.info("Notices: after RESET: %s" % dump_json(self.notices))
754
755 def post(self, notice):
756 # app.logger.debug("Notices: POST %s" % notice)
757 self.notices.append(notice)
758 # app.logger.info("Notices: after POST: %s" % dump_json(self.notices))
759
760 def prepend(self, notice):
761 # app.logger.debug("Notices: PREPEND %s" % notice)
762 self.notices.insert(0, notice)
763 # app.logger.info("Notices: after PREPEND: %s" % dump_json(self.notices))
764
765 def extend(self, notices):
766 for notice in notices:
767 self.post(notice)
768
769
770def td_format(td_object):
771 seconds = int(td_object.total_seconds())
772 periods = [
773 ("year", 60 * 60 * 24 * 365),
774 ("month", 60 * 60 * 24 * 30),
775 ("day", 60 * 60 * 24),
776 ("hour", 60 * 60),
777 ("minute", 60),
778 ("second", 1),
779 ]
780
781 strings = []
782 for period_name, period_seconds in periods:
783 if seconds > period_seconds:
784 period_value, seconds = divmod(seconds, period_seconds)
785
786 strings.append(
787 "%d %s%s" % (period_value, period_name, "" if (period_value == 1) else "s")
788 )
789
790 formatted = ", ".join(strings)
791
792 if not formatted:
793 formatted = "0s"
794
795 return formatted
796
797
798def interval_format(seconds, normal_format, now_message):
799 if seconds >= 1:
800 return normal_format % td_format(datetime.timedelta(seconds=seconds))
801 else:
802 return now_message
803
804
805def system_info(app):
806 ir = app.ir
807 debug_mode = False
808
809 if ir:
810 amod = ir.ambassador_module
811 debug_mode = amod.get("debug_mode", False)
812
813 app.logger.debug(f"DEBUG_MODE {debug_mode}")
814
815 status_dict = {"config failure": [False, "no configuration loaded"]}
816
817 env_status = getattr(app.watcher, "env_status", None)
818
819 if env_status:
820 status_dict = env_status.to_dict()
821 app.logger.debug(f"status_dict {status_dict}")
822
823 return {
824 "version": __version__,
825 "hostname": SystemInfo.MyHostName,
826 "ambassador_id": Config.ambassador_id,
827 "ambassador_namespace": Config.ambassador_namespace,
828 "single_namespace": Config.single_namespace,
829 "knative_enabled": os.environ.get("AMBASSADOR_KNATIVE_SUPPORT", "").lower() == "true",
830 "statsd_enabled": os.environ.get("STATSD_ENABLED", "").lower() == "true",
831 "endpoints_enabled": Config.enable_endpoints,
832 "cluster_id": os.environ.get(
833 "AMBASSADOR_CLUSTER_ID",
834 os.environ.get("AMBASSADOR_SCOUT_ID", "00000000-0000-0000-0000-000000000000"),
835 ),
836 "boot_time": boot_time,
837 "hr_uptime": td_format(datetime.datetime.now() - boot_time),
838 "latest_snapshot": app.latest_snapshot,
839 "env_good": getattr(app.watcher, "env_good", False),
840 "env_failures": getattr(app.watcher, "failure_list", ["no IR loaded"]),
841 "env_status": status_dict,
842 "debug_mode": debug_mode,
843 }
844
845
846def envoy_status(estats: EnvoyStats):
847 since_boot = interval_format(estats.time_since_boot(), "%s", "less than a second")
848
849 since_update = "Never updated"
850
851 if estats.time_since_update():
852 since_update = interval_format(
853 estats.time_since_update(), "%s ago", "within the last second"
854 )
855
856 return {
857 "alive": estats.is_alive(),
858 "ready": estats.is_ready(),
859 "uptime": since_boot,
860 "since_update": since_update,
861 }
862
863
864def drop_serializer_key(d: Dict[Any, Any]) -> Dict[Any, Any]:
865 """
866 Delete the "serialization" key (if present) in any dictionary passed in and
867 return that dictionary. This function is intended to be used as the
868 object_hook value for json.load[s].
869 """
870 _ = d.pop("serialization", None)
871 return d
872
873
874def filter_keys(d: Dict[Any, Any], keys_to_keep):
875 unwanted_keys = set(d) - set(keys_to_keep)
876 for unwanted_key in unwanted_keys:
877 del d[unwanted_key]
878
879
880def filter_webui(d: Dict[Any, Any]):
881 filter_keys(
882 d,
883 [
884 "system",
885 "route_info",
886 "source_map",
887 "ambassador_resolvers",
888 "ambassador_services",
889 "envoy_status",
890 "cluster_stats",
891 "loginfo",
892 "errors",
893 ],
894 )
895 for ambassador_resolver in d["ambassador_resolvers"]:
896 filter_keys(ambassador_resolver, ["_source", "kind"])
897 for route_info in d["route_info"]:
898 filter_keys(route_info, ["diag_class", "key", "headers", "precedence", "clusters"])
899 for cluster in route_info["clusters"]:
900 filter_keys(cluster, ["_hcolor", "type_label", "service", "weight"])
901
902
903@app.route("/_internal/v0/ping", methods=["GET"])
904@internal_handler
905def handle_ping():
906 return "ACK\n", 200
907
908
909@app.route("/_internal/v0/features", methods=["GET"])
910@internal_handler
911def handle_features():
912 # If we don't have an IR yet, do nothing.
913 #
914 # We don't bother grabbing the config_lock here because we're not changing
915 # anything, and an features request hitting at exactly the same moment as
916 # the first configure is a race anyway. If it fails, that's not a big deal,
917 # they can try again.
918 if not app.ir:
919 app.logger.debug("Features: configuration required first")
920 return "Can't do features before configuration", 503
921
922 return jsonify(app.ir.features()), 200
923
924
925@app.route("/_internal/v0/watt", methods=["POST"])
926@internal_handler
927def handle_watt_update():
928 url = request.args.get("url", None)
929
930 if not url:
931 app.logger.error("error: watt update requested with no URL")
932 return "error: watt update requested with no URL\n", 400
933
934 app.logger.debug("Update requested: watt, %s" % url)
935
936 status, info = app.watcher.post("CONFIG", ("watt", url))
937
938 return info, status
939
940
941@app.route("/_internal/v0/fs", methods=["POST"])
942@internal_handler
943def handle_fs():
944 path = request.args.get("path", None)
945
946 if not path:
947 app.logger.error("error: update requested with no PATH")
948 return "error: update requested with no PATH\n", 400
949
950 app.logger.debug("Update requested from %s" % path)
951
952 status, info = app.watcher.post("CONFIG_FS", path)
953
954 return info, status
955
956
957@app.route("/_internal/v0/events", methods=["GET"])
958@internal_handler
959def handle_events():
960 if not app.local_scout:
961 return "Local Scout is not enabled\n", 400
962
963 event_dump = [
964 (x["local_scout_timestamp"], x["mode"], x["action"], x) for x in app.scout._scout.events
965 ]
966
967 app.logger.debug(f"Event dump {event_dump}")
968
969 return jsonify(event_dump)
970
971
972@app.route("/ambassador/v0/favicon.ico", methods=["GET"])
973def favicon():
974 template_path = resource_filename(Requirement.parse("ambassador"), "templates")
975
976 return send_from_directory(template_path, "favicon.ico")
977
978
979@app.route("/ambassador/v0/check_alive", methods=["GET"])
980def check_alive():
981 status = envoy_status(app.estatsmgr.get_stats())
982
983 if status["alive"]:
984 return "ambassador liveness check OK (%s)\n" % status["uptime"], 200
985 else:
986 return "ambassador seems to have died (%s)\n" % status["uptime"], 503
987
988
989@app.route("/ambassador/v0/check_ready", methods=["GET"])
990def check_ready():
991 if not app.ir:
992 return "ambassador waiting for config\n", 503
993
994 status = envoy_status(app.estatsmgr.get_stats())
995
996 if status["ready"]:
997 return "ambassador readiness check OK (%s)\n" % status["since_update"], 200
998 else:
999 return "ambassador not ready (%s)\n" % status["since_update"], 503
1000
1001
1002@app.route("/ambassador/v0/diag/", methods=["GET"])
1003@standard_handler
1004def show_overview(reqid=None):
1005 # If we don't have an IR yet, do nothing.
1006 #
1007 # We don't bother grabbing the config_lock here because we're not changing
1008 # anything, and an overview request hitting at exactly the same moment as
1009 # the first configure is a race anyway. If it fails, that's not a big deal,
1010 # they can try again.
1011 if not app.ir:
1012 app.logger.debug("OV %s - can't do overview before configuration" % reqid)
1013 return "Can't do overview before configuration", 503
1014
1015 if not _allow_diag_ui():
1016 return Response("Not found\n", 404)
1017
1018 app.logger.debug("OV %s - showing overview" % reqid)
1019
1020 # Remember that app.diag is a property that can involve some real expense
1021 # to compute -- we don't want to call it more than once here, so we cache
1022 # its value.
1023 diag = app.diag
1024
1025 if app.verbose:
1026 app.logger.debug("OV %s: DIAG" % reqid)
1027 app.logger.debug("%s" % dump_json(diag.as_dict(), pretty=True))
1028
1029 estats = app.estatsmgr.get_stats()
1030 ov = diag.overview(request, estats)
1031
1032 if app.verbose:
1033 app.logger.debug("OV %s: OV" % reqid)
1034 app.logger.debug("%s" % dump_json(ov, pretty=True))
1035 app.logger.debug("OV %s: collecting errors" % reqid)
1036
1037 ddict = collect_errors_and_notices(request, reqid, "overview", diag)
1038
1039 banner_content = None
1040 if app.banner_endpoint and app.ir and app.ir.edge_stack_allowed:
1041 try:
1042 response = requests.get(app.banner_endpoint)
1043 if response.status_code == 200:
1044 banner_content = response.text
1045 except Exception as e:
1046 app.logger.error("could not get banner_content: %s" % e)
1047
1048 tvars = dict(
1049 system=system_info(app),
1050 envoy_status=envoy_status(estats),
1051 loginfo=app.estatsmgr.loginfo,
1052 notices=app.notices.notices,
1053 banner_content=banner_content,
1054 **ov,
1055 **ddict,
1056 )
1057
1058 patch_client = request.args.get("patch_client", None)
1059 if request.args.get("json", None):
1060 filter_key = request.args.get("filter", None)
1061
1062 if filter_key == "webui":
1063 filter_webui(tvars)
1064 elif filter_key:
1065 return jsonify(tvars.get(filter_key, None))
1066
1067 if patch_client:
1068 # Assume this is the Admin UI. Recursively drop all "serialization"
1069 # keys. This avoids leaking secrets and generally makes the
1070 # snapshot a lot smaller without losing information that the Admin
1071 # UI cares about. We do this below by setting the object_hook
1072 # parameter of the json.loads(...) call. We have to use python's
1073 # json library instead of orjson, because orjson does not support
1074 # the object_hook feature.
1075
1076 # Get the previous full representation
1077 cached_tvars_json = tvars_cache.get(patch_client, dict())
1078 # Serialize the tvars into a json-string using the same jsonify Flask serializer, then load the json object
1079 response_content = json.loads(flask_json.dumps(tvars), object_hook=drop_serializer_key)
1080 # Diff between the previous representation and the current full representation (http://jsonpatch.com/)
1081 patch = jsonpatch.make_patch(cached_tvars_json, response_content)
1082 # Save the current full representation in memory
1083 tvars_cache[patch_client] = response_content
1084
1085 # Return only the diff
1086 return Response(patch.to_string(), mimetype="application/json")
1087 else:
1088 return jsonify(tvars)
1089 else:
1090 app.check_scout("overview")
1091 return Response(render_template("overview.html", **tvars))
1092
1093
1094def collect_errors_and_notices(request, reqid, what: str, diag: Diagnostics) -> Dict:
1095 loglevel = request.args.get("loglevel", None)
1096 notice = None
1097
1098 if loglevel:
1099 app.logger.debug("%s %s -- requesting loglevel %s" % (what, reqid, loglevel))
1100 app.diag_log_level.labels("debug").set(1 if loglevel == "debug" else 0)
1101
1102 if not app.estatsmgr.update_log_levels(time.time(), level=loglevel):
1103 notice = {"level": "WARNING", "message": "Could not update log level!"}
1104 # else:
1105 # return redirect("/ambassador/v0/diag/", code=302)
1106
1107 # We need to grab errors and notices from diag.as_dict(), process the errors so
1108 # they work for the HTML rendering, and post the notices to app.notices. Then we
1109 # return the dict representation that our caller should work with.
1110
1111 ddict = diag.as_dict()
1112
1113 # app.logger.debug("ddict %s" % dump_json(ddict, pretty=True))
1114
1115 derrors = ddict.pop("errors", {})
1116
1117 errors = []
1118
1119 for err_key, err_list in derrors.items():
1120 if err_key == "-global-":
1121 err_key = ""
1122
1123 for err in err_list:
1124 errors.append((err_key, err["error"]))
1125
1126 dnotices = ddict.pop("notices", {})
1127
1128 # Make sure that anything about the loglevel gets folded into this set.
1129 if notice:
1130 app.notices.prepend(notice)
1131
1132 for notice_key, notice_list in dnotices.items():
1133 for notice in notice_list:
1134 app.notices.post({"level": "NOTICE", "message": "%s: %s" % (notice_key, notice)})
1135
1136 ddict["errors"] = errors
1137
1138 return ddict
1139
1140
1141@app.route("/ambassador/v0/diag/<path:source>", methods=["GET"])
1142@standard_handler
1143def show_intermediate(source=None, reqid=None):
1144 # If we don't have an IR yet, do nothing.
1145 #
1146 # We don't bother grabbing the config_lock here because we're not changing
1147 # anything, and an overview request hitting at exactly the same moment as
1148 # the first configure is a race anyway. If it fails, that's not a big deal,
1149 # they can try again.
1150 if not app.ir:
1151 app.logger.debug(
1152 "SRC %s - can't do intermediate for %s before configuration" % (reqid, source)
1153 )
1154 return "Can't do overview before configuration", 503
1155
1156 if not _allow_diag_ui():
1157 return Response("Not found\n", 404)
1158
1159 app.logger.debug("SRC %s - getting intermediate for '%s'" % (reqid, source))
1160
1161 # Remember that app.diag is a property that can involve some real expense
1162 # to compute -- we don't want to call it more than once here, so we cache
1163 # its value.
1164 diag = app.diag
1165
1166 method = request.args.get("method", None)
1167 resource = request.args.get("resource", None)
1168
1169 estats = app.estatsmgr.get_stats()
1170 result = diag.lookup(request, source, estats)
1171
1172 if app.verbose:
1173 app.logger.debug("RESULT %s" % dump_json(result, pretty=True))
1174
1175 ddict = collect_errors_and_notices(request, reqid, "detail %s" % source, diag)
1176
1177 tvars = dict(
1178 system=system_info(app),
1179 envoy_status=envoy_status(estats),
1180 loginfo=app.estatsmgr.loginfo,
1181 notices=app.notices.notices,
1182 method=method,
1183 resource=resource,
1184 **result,
1185 **ddict,
1186 )
1187
1188 if request.args.get("json", None):
1189 key = request.args.get("filter", None)
1190
1191 if key:
1192 return jsonify(tvars.get(key, None))
1193 else:
1194 return jsonify(tvars)
1195 else:
1196 app.check_scout("detail: %s" % source)
1197 return Response(render_template("diag.html", **tvars))
1198
1199
1200@app.template_filter("sort_by_key")
1201def sort_by_key(objects):
1202 return sorted(objects, key=lambda x: x["key"])
1203
1204
1205@app.template_filter("pretty_json")
1206def pretty_json(obj):
1207 if isinstance(obj, dict):
1208 obj = dict(**obj)
1209
1210 keys_to_drop = [key for key in obj.keys() if key.startswith("_")]
1211
1212 for key in keys_to_drop:
1213 del obj[key]
1214
1215 return dump_json(obj, pretty=True)
1216
1217
1218@app.template_filter("sort_clusters_by_service")
1219def sort_clusters_by_service(clusters):
1220 return sorted(clusters, key=lambda x: x["service"])
1221 # return sorted([ c for c in clusters.values() ], key=lambda x: x['service'])
1222
1223
1224@app.template_filter("source_lookup")
1225def source_lookup(name, sources):
1226 app.logger.debug("%s => sources %s" % (name, sources))
1227
1228 source = sources.get(name, {})
1229
1230 app.logger.debug("%s => source %s" % (name, source))
1231
1232 return source.get("_source", name)
1233
1234
1235@app.route("/metrics", methods=["GET"])
1236@standard_handler
1237def get_prometheus_metrics(*args, **kwargs):
1238 # Envoy metrics
1239 envoy_metrics = app.estatsmgr.get_prometheus_stats()
1240
1241 # Ambassador OSS metrics
1242 ambassador_metrics = generate_latest(registry=app.metrics_registry).decode("utf-8")
1243
1244 # Extra metrics endpoint
1245 extra_metrics_content = ""
1246 if app.metrics_endpoint and app.ir and app.ir.edge_stack_allowed:
1247 try:
1248 response = requests.get(app.metrics_endpoint)
1249 if response.status_code == 200:
1250 extra_metrics_content = response.text
1251 except Exception as e:
1252 app.logger.error("could not get metrics_endpoint: %s" % e)
1253
1254 return Response(
1255 "".join([envoy_metrics, ambassador_metrics, extra_metrics_content]).encode("utf-8"),
1256 200,
1257 mimetype="text/plain",
1258 )
1259
1260
1261def bool_fmt(b: bool) -> str:
1262 return "T" if b else "F"
1263
1264
1265class StatusInfo:
1266 def __init__(self) -> None:
1267 self.status = True
1268 self.specifics: List[Tuple[bool, str]] = []
1269
1270 def failure(self, message: str) -> None:
1271 self.status = False
1272 self.specifics.append((False, message))
1273
1274 def OK(self, message: str) -> None:
1275 self.specifics.append((True, message))
1276
1277 def to_dict(self) -> Dict[str, Union[bool, List[Tuple[bool, str]]]]:
1278 return {"status": self.status, "specifics": self.specifics}
1279
1280
1281class SystemStatus:
1282 def __init__(self) -> None:
1283 self.status: Dict[str, StatusInfo] = {}
1284
1285 def failure(self, key: str, message: str) -> None:
1286 self.info_for_key(key).failure(message)
1287
1288 def OK(self, key: str, message: str) -> None:
1289 self.info_for_key(key).OK(message)
1290
1291 def info_for_key(self, key) -> StatusInfo:
1292 if key not in self.status:
1293 self.status[key] = StatusInfo()
1294
1295 return self.status[key]
1296
1297 def to_dict(self) -> Dict[str, Dict[str, Union[bool, List[Tuple[bool, str]]]]]:
1298 return {key: info.to_dict() for key, info in self.status.items()}
1299
1300
1301class KubeStatus:
1302 pool: concurrent.futures.ProcessPoolExecutor
1303
1304 def __init__(self, app) -> None:
1305 self.app = app
1306 self.logger = app.logger
1307 self.live: Dict[str, bool] = {}
1308 self.current_status: Dict[str, str] = {}
1309 self.pool = concurrent.futures.ProcessPoolExecutor(max_workers=5)
1310
1311 def mark_live(self, kind: str, name: str, namespace: str) -> None:
1312 key = f"{kind}/{name}.{namespace}"
1313
1314 # self.logger.debug(f"KubeStatus MASTER {os.getpid()}: mark_live {key}")
1315 self.live[key] = True
1316
1317 def prune(self) -> None:
1318 drop: List[str] = []
1319
1320 for key in self.current_status.keys():
1321 if not self.live.get(key, False):
1322 drop.append(key)
1323
1324 for key in drop:
1325 # self.logger.debug(f"KubeStatus MASTER {os.getpid()}: prune {key}")
1326 del self.current_status[key]
1327
1328 self.live = {}
1329
1330 def post(self, kind: str, name: str, namespace: str, text: str) -> None:
1331 key = f"{kind}/{name}.{namespace}"
1332 extant = self.current_status.get(key, None)
1333
1334 if extant == text:
1335 # self.logger.info(f"KubeStatus MASTER {os.getpid()}: {key} == {text}")
1336 pass
1337 else:
1338 # self.logger.info(f"KubeStatus MASTER {os.getpid()}: {key} needs {text}")
1339
1340 # For now we're going to assume that this works.
1341 self.current_status[key] = text
1342 f = self.pool.submit(kubestatus_update, kind, name, namespace, text)
1343 f.add_done_callback(kubestatus_update_done)
1344
1345
1346# The KubeStatusNoMappings class clobbers the mark_live() method of the
1347# KubeStatus class, so that updates to Mappings don't actually have any
1348# effect, but updates to Ingress (for example) do.
1349class KubeStatusNoMappings(KubeStatus):
1350 def mark_live(self, kind: str, name: str, namespace: str) -> None:
1351 pass
1352
1353 def post(self, kind: str, name: str, namespace: str, text: str) -> None:
1354 # There's a path (via IRBaseMapping.check_status) where a Mapping
1355 # can be added directly to ir.k8s_status_updates, which will come
1356 # straight here without mark_live being involved -- so short-circuit
1357 # here for Mappings, too.
1358
1359 if kind == "Mapping":
1360 return
1361
1362 super().post(kind, name, namespace, text)
1363
1364
1365def kubestatus_update(kind: str, name: str, namespace: str, text: str) -> str:
1366 cmd = [
1367 "kubestatus",
1368 "--cache-dir",
1369 "/tmp/client-go-http-cache",
1370 kind,
1371 name,
1372 "-n",
1373 namespace,
1374 "-u",
1375 "/dev/fd/0",
1376 ]
1377 # print(f"KubeStatus UPDATE {os.getpid()}: running command: {cmd}")
1378
1379 try:
1380 rc = subprocess.run(
1381 cmd,
1382 input=text.encode("utf-8"),
1383 stdout=subprocess.PIPE,
1384 stderr=subprocess.STDOUT,
1385 timeout=5,
1386 )
1387 if rc.returncode == 0:
1388 return f"{name}.{namespace}: update OK"
1389 else:
1390 return f"{name}.{namespace}: error {rc.returncode}"
1391
1392 except subprocess.TimeoutExpired as e:
1393 return f"{name}.{namespace}: timed out\n\n{e.output}"
1394
1395
1396def kubestatus_update_done(f: concurrent.futures.Future) -> None:
1397 # print(f"KubeStatus DONE {os.getpid()}: result {f.result()}")
1398 pass
1399
1400
1401class AmbassadorEventWatcher(threading.Thread):
1402 # The key for 'Actions' is chimed - chimed_ok - env_good. This will make more sense
1403 # if you read through the _load_ir method.
1404
1405 Actions = {
1406 "F-F-F": ("unhealthy", True), # make sure the first chime always gets out
1407 "F-F-T": ("now-healthy", True), # make sure the first chime always gets out
1408 "F-T-F": ("now-unhealthy", True), # this is actually impossible
1409 "F-T-T": ("healthy", True), # this is actually impossible
1410 "T-F-F": ("unhealthy", False),
1411 "T-F-T": ("now-healthy", True),
1412 "T-T-F": ("now-unhealthy", True),
1413 "T-T-T": ("update", False),
1414 }
1415
1416 reCompressed = re.compile(r"-\d+$")
1417
1418 def __init__(self, app: DiagApp) -> None:
1419 super().__init__(name="AEW", daemon=True)
1420 self.app = app
1421 self.logger = self.app.logger
1422 self.events: queue.Queue = queue.Queue()
1423
1424 self.chimed = False # Have we ever sent a chime about the environment?
1425 self.last_chime = False # What was the status of our last chime? (starts as False)
1426 self.env_good = False # Is our environment currently believed to be OK?
1427 self.failure_list: List[str] = [
1428 "unhealthy at boot"
1429 ] # What's making our environment not OK?
1430
1431 def post(
1432 self, cmd: str, arg: Optional[Union[str, Tuple[str, Optional[IR]]]]
1433 ) -> Tuple[int, str]:
1434 rqueue: queue.Queue = queue.Queue()
1435
1436 self.events.put((cmd, arg, rqueue))
1437
1438 return rqueue.get()
1439
1440 def update_estats(self) -> None:
1441 self.app.estatsmgr.update()
1442
1443 def run(self):
1444 self.logger.info("starting Scout checker and timer logger")
1445 self.app.scout_checker = PeriodicTrigger(
1446 lambda: self.check_scout("checkin"), period=86400
1447 ) # Yup, one day.
1448 self.app.timer_logger = PeriodicTrigger(self.app.post_timer_event, period=1)
1449
1450 self.logger.info("starting event watcher")
1451
1452 while True:
1453 cmd, arg, rqueue = self.events.get()
1454 # self.logger.info("EVENT: %s" % cmd)
1455
1456 if cmd == "CONFIG_FS":
1457 try:
1458 self.load_config_fs(rqueue, arg)
1459 except Exception as e:
1460 self.logger.error("could not reconfigure: %s" % e)
1461 self.logger.exception(e)
1462 self._respond(rqueue, 500, "configuration from filesystem failed")
1463 elif cmd == "CONFIG":
1464 version, url = arg
1465
1466 try:
1467 if version == "watt":
1468 self.load_config_watt(rqueue, url)
1469 else:
1470 raise RuntimeError("config from %s not supported" % version)
1471 except Exception as e:
1472 self.logger.error("could not reconfigure: %s" % e)
1473 self.logger.exception(e)
1474 self._respond(rqueue, 500, "configuration failed")
1475 elif cmd == "SCOUT":
1476 try:
1477 self._respond(rqueue, 200, "checking Scout")
1478 self.check_scout(*arg)
1479 except Exception as e:
1480 self.logger.error("could not reconfigure: %s" % e)
1481 self.logger.exception(e)
1482 self._respond(rqueue, 500, "scout check failed")
1483 elif cmd == "TIMER":
1484 try:
1485 self._respond(rqueue, 200, "done")
1486 self.app.check_timers()
1487 except Exception as e:
1488 self.logger.error("could not check timers? %s" % e)
1489 self.logger.exception(e)
1490 else:
1491 self.logger.error(f"unknown event type: '{cmd}' '{arg}'")
1492 self._respond(rqueue, 400, f"unknown event type '{cmd}' '{arg}'")
1493
1494 def _respond(self, rqueue: queue.Queue, status: int, info="") -> None:
1495 # self.logger.debug("responding to query with %s %s" % (status, info))
1496 rqueue.put((status, info))
1497
1498 # load_config_fs reconfigures from the filesystem. It's _mostly_ legacy
1499 # code, but not entirely, since Docker demo mode still uses it.
1500 #
1501 # BE CAREFUL ABOUT STOPPING THE RECONFIGURATION TIMER ONCE IT IS STARTED.
1502 def load_config_fs(self, rqueue: queue.Queue, path: str) -> None:
1503 self.logger.debug("loading configuration from disk: %s" % path)
1504
1505 # The "path" here can just be a path, but it can also be a command for testing,
1506 # if the user has chosen to allow that.
1507
1508 if self.app.allow_fs_commands and (":" in path):
1509 pfx, rest = path.split(":", 1)
1510
1511 if pfx.lower() == "cmd":
1512 fields = rest.split(":", 1)
1513
1514 cmd = fields[0].upper()
1515
1516 args = fields[1:] if (len(fields) > 1) else None
1517
1518 if cmd.upper() == "CHIME":
1519 self.logger.info("CMD: Chiming")
1520
1521 self.chime()
1522
1523 self._respond(rqueue, 200, "Chimed")
1524 elif cmd.upper() == "CHIME_RESET":
1525 self.chimed = False
1526 self.last_chime = False
1527 self.env_good = False
1528
1529 self.app.scout.reset_events()
1530 self.app.scout.report(mode="boot", action="boot1", no_cache=True)
1531
1532 self.logger.info("CMD: Reset chime state")
1533 self._respond(rqueue, 200, "CMD: Reset chime state")
1534 elif cmd.upper() == "SCOUT_CACHE_RESET":
1535 self.app.scout.reset_cache_time()
1536
1537 self.logger.info("CMD: Reset Scout cache time")
1538 self._respond(rqueue, 200, "CMD: Reset Scout cache time")
1539 elif cmd.upper() == "ENV_OK":
1540 self.env_good = True
1541 self.failure_list = []
1542
1543 self.logger.info("CMD: Marked environment good")
1544 self._respond(rqueue, 200, "CMD: Marked environment good")
1545 elif cmd.upper() == "ENV_BAD":
1546 self.env_good = False
1547 self.failure_list = ["failure forced"]
1548
1549 self.logger.info("CMD: Marked environment bad")
1550 self._respond(rqueue, 200, "CMD: Marked environment bad")
1551 else:
1552 self.logger.info(f'CMD: no such command "{cmd}"')
1553 self._respond(rqueue, 400, f'CMD: no such command "{cmd}"')
1554
1555 return
1556 else:
1557 self.logger.info(f'CONFIG_FS: invalid prefix "{pfx}"')
1558 self._respond(rqueue, 400, f'CONFIG_FS: invalid prefix "{pfx}"')
1559
1560 return
1561
1562 # OK, we're starting a reconfiguration. BE CAREFUL TO STOP THE TIMER
1563 # BEFORE YOU RESPOND TO THE CALLER.
1564 self.app.config_timer.start()
1565
1566 snapshot = re.sub(r"[^A-Za-z0-9_-]", "_", path)
1567 scc = FSSecretHandler(app.logger, path, app.snapshot_path, "0")
1568
1569 with self.app.fetcher_timer:
1570 aconf = Config()
1571 fetcher = ResourceFetcher(app.logger, aconf)
1572 fetcher.load_from_filesystem(path, k8s=app.k8s, recurse=True)
1573
1574 if not fetcher.elements:
1575 self.logger.debug("no configuration resources found at %s" % path)
1576 # Don't bail from here -- go ahead and reload the IR.
1577 #
1578 # XXX This is basically historical logic, honestly. But if you try
1579 # to respond from here and bail, STOP THE RECONFIGURATION TIMER.
1580
1581 self._load_ir(rqueue, aconf, fetcher, scc, snapshot)
1582
1583 # load_config_watt reconfigures from the filesystem. It's the one true way of
1584 # reconfiguring these days.
1585 #
1586 # BE CAREFUL ABOUT STOPPING THE RECONFIGURATION TIMER ONCE IT IS STARTED.
1587 def load_config_watt(self, rqueue: queue.Queue, url: str):
1588 snapshot = url.split("/")[-1]
1589 ss_path = os.path.join(app.snapshot_path, "snapshot-tmp.yaml")
1590
1591 # OK, we're starting a reconfiguration. BE CAREFUL TO STOP THE TIMER
1592 # BEFORE YOU RESPOND TO THE CALLER.
1593 self.app.config_timer.start()
1594
1595 self.logger.debug("copying configuration: watt, %s to %s" % (url, ss_path))
1596
1597 # Grab the serialization, and save it to disk too.
1598 serialization = load_url_contents(self.logger, url, stream2=open(ss_path, "w"))
1599
1600 if not serialization:
1601 self.logger.debug("no data loaded from snapshot %s" % snapshot)
1602 # We never used to return here. I'm not sure if that's really correct?
1603 #
1604 # IF YOU CHANGE THIS, BE CAREFUL TO STOP THE RECONFIGURATION TIMER.
1605
1606 # Weirdly, we don't need a special WattSecretHandler: parse_watt knows how to handle
1607 # the secrets that watt sends.
1608 scc = SecretHandler(app.logger, url, app.snapshot_path, snapshot)
1609
1610 # OK. Time the various configuration sections separately.
1611
1612 with self.app.fetcher_timer:
1613 aconf = Config()
1614 fetcher = ResourceFetcher(app.logger, aconf)
1615
1616 if serialization:
1617 fetcher.parse_watt(serialization)
1618
1619 if not fetcher.elements:
1620 self.logger.debug("no configuration found in snapshot %s" % snapshot)
1621
1622 # Don't actually bail here. If they send over a valid config that happens
1623 # to have nothing for us, it's still a legit config.
1624 #
1625 # IF YOU CHANGE THIS, BE CAREFUL TO STOP THE RECONFIGURATION TIMER.
1626
1627 self._load_ir(rqueue, aconf, fetcher, scc, snapshot)
1628
1629 # _load_ir is where the heavy lifting of a reconfigure happens.
1630 #
1631 # AT THE POINT OF ENTRY, THE RECONFIGURATION TIMER IS RUNNING. DO NOT LEAVE
1632 # THIS METHOD WITHOUT STOPPING THE RECONFIGURATION TIMER.
1633 def _load_ir(
1634 self,
1635 rqueue: queue.Queue,
1636 aconf: Config,
1637 fetcher: ResourceFetcher,
1638 secret_handler: SecretHandler,
1639 snapshot: str,
1640 ) -> None:
1641 with self.app.aconf_timer:
1642 aconf.load_all(fetcher.sorted())
1643
1644 # TODO(Flynn): This is an awful hack. Have aconf.load(fetcher) that does
1645 # this correctly.
1646 #
1647 # I'm not doing this at this moment because aconf.load_all() is called in a
1648 # lot of places, and I don't want to destablize 2.2.2.
1649 aconf.load_invalid(fetcher)
1650
1651 aconf_path = os.path.join(app.snapshot_path, "aconf-tmp.json")
1652 open(aconf_path, "w").write(aconf.as_json())
1653
1654 # OK. What kind of reconfiguration are we doing?
1655 config_type, reset_cache, invalidate_groups_for = IR.check_deltas(
1656 self.logger, fetcher, self.app.cache
1657 )
1658
1659 if reset_cache:
1660 self.logger.debug("RESETTING CACHE")
1661 self.app.cache = Cache(self.logger)
1662
1663 with self.app.ir_timer:
1664 ir = IR(
1665 aconf,
1666 secret_handler=secret_handler,
1667 invalidate_groups_for=invalidate_groups_for,
1668 cache=self.app.cache,
1669 )
1670
1671 ir_path = os.path.join(app.snapshot_path, "ir-tmp.json")
1672 open(ir_path, "w").write(ir.as_json())
1673
1674 with self.app.econf_timer:
1675 self.logger.debug(
1676 "generating envoy configuration with api version %s" % Config.envoy_api_version
1677 )
1678 econf = EnvoyConfig.generate(ir, Config.envoy_api_version, cache=self.app.cache)
1679
1680 # DON'T generate the Diagnostics here, because that turns out to be expensive.
1681 # Instead, we'll just reset app.diag to None, then generate it on-demand when
1682 # we need it.
1683 #
1684 # DO go ahead and split the Envoy config into its components for later, though.
1685 bootstrap_config, ads_config, clustermap = econf.split_config()
1686
1687 # OK. Assume that the Envoy config is valid...
1688 econf_is_valid = True
1689 econf_bad_reason = ""
1690
1691 # ...then look for reasons it's not valid.
1692 if not econf.has_listeners():
1693 # No listeners == something in the Ambassador config is totally horked.
1694 # Probably this is the user not defining any Hosts that match
1695 # the Listeners in the system.
1696 #
1697 # As it happens, Envoy is OK running a config with no listeners, and it'll
1698 # answer on port 8001 for readiness checks, so... log a notice, but run with it.
1699 self.logger.warning(
1700 "No active listeners at all; check your Listener and Host configuration"
1701 )
1702 elif not self.validate_envoy_config(
1703 ir, config=ads_config, retries=self.app.validation_retries
1704 ):
1705 # Invalid Envoy config probably indicates a bug in Emissary itself. Sigh.
1706 econf_is_valid = False
1707 econf_bad_reason = "invalid envoy configuration generated"
1708
1709 # OK. Is the config invalid?
1710 if not econf_is_valid:
1711 # BZzzt. Don't post this update.
1712 self.logger.info(
1713 "no update performed (%s), continuing with current configuration..."
1714 % econf_bad_reason
1715 )
1716
1717 # Don't use app.check_scout; it will deadlock.
1718 self.check_scout("attempted bad update")
1719
1720 # DO stop the reconfiguration timer before leaving.
1721 self.app.config_timer.stop()
1722 self._respond(
1723 rqueue, 500, "ignoring (%s) in snapshot %s" % (econf_bad_reason, snapshot)
1724 )
1725 return
1726
1727 snapcount = int(os.environ.get("AMBASSADOR_SNAPSHOT_COUNT", "4"))
1728 snaplist: List[Tuple[str, str]] = []
1729
1730 if snapcount > 0:
1731 self.logger.debug("rotating snapshots for snapshot %s" % snapshot)
1732
1733 # If snapcount is 4, this range statement becomes range(-4, -1)
1734 # which gives [ -4, -3, -2 ], which the list comprehension turns
1735 # into [ ( "-3", "-4" ), ( "-2", "-3" ), ( "-1", "-2" ) ]...
1736 # which is the list of suffixes to rename to rotate the snapshots.
1737
1738 snaplist += [(str(x + 1), str(x)) for x in range(-1 * snapcount, -1)]
1739
1740 # After dealing with that, we need to rotate the current file into -1.
1741 snaplist.append(("", "-1"))
1742
1743 # Whether or not we do any rotation, we need to cycle in the '-tmp' file.
1744 snaplist.append(("-tmp", ""))
1745
1746 for from_suffix, to_suffix in snaplist:
1747 for fmt in ["aconf{}.json", "econf{}.json", "ir{}.json", "snapshot{}.yaml"]:
1748 from_path = os.path.join(app.snapshot_path, fmt.format(from_suffix))
1749 to_path = os.path.join(app.snapshot_path, fmt.format(to_suffix))
1750
1751 # Make sure we don't leave this method on error! The reconfiguration
1752 # timer is still running, but also, the snapshots are a debugging aid:
1753 # if we can't rotate them, meh, whatever.
1754
1755 try:
1756 self.logger.debug("rotate: %s -> %s" % (from_path, to_path))
1757 os.rename(from_path, to_path)
1758 except IOError as e:
1759 self.logger.debug("skip %s -> %s: %s" % (from_path, to_path, e))
1760 except Exception as e:
1761 self.logger.debug("could not rename %s -> %s: %s" % (from_path, to_path, e))
1762
1763 app.latest_snapshot = snapshot
1764 self.logger.debug("saving Envoy configuration for snapshot %s" % snapshot)
1765
1766 with open(app.bootstrap_path, "w") as output:
1767 output.write(dump_json(bootstrap_config, pretty=True))
1768
1769 with open(app.ads_path, "w") as output:
1770 output.write(dump_json(ads_config, pretty=True))
1771
1772 with open(app.clustermap_path, "w") as output:
1773 output.write(dump_json(clustermap, pretty=True))
1774
1775 with app.config_lock:
1776 app.aconf = aconf
1777 app.ir = ir
1778 app.econf = econf
1779
1780 # Force app.diag to None so that it'll be regenerated on-demand.
1781 app.diag = None
1782
1783 # We're finally done with the whole configuration process.
1784 self.app.config_timer.stop()
1785
1786 if app.kick:
1787 self.logger.debug("running '%s'" % app.kick)
1788 os.system(app.kick)
1789 elif app.ambex_pid != 0:
1790 self.logger.debug("notifying PID %d ambex" % app.ambex_pid)
1791 os.kill(app.ambex_pid, signal.SIGHUP)
1792
1793 # don't worry about TCPMappings yet
1794 mappings = app.aconf.get_config("mappings")
1795
1796 if mappings:
1797 for mapping_name, mapping in mappings.items():
1798 app.kubestatus.mark_live(
1799 "Mapping", mapping_name, mapping.get("namespace", Config.ambassador_namespace)
1800 )
1801
1802 app.kubestatus.prune()
1803
1804 if app.ir.k8s_status_updates:
1805 update_count = 0
1806
1807 for name in app.ir.k8s_status_updates.keys():
1808 update_count += 1
1809 # Strip off any namespace in the name.
1810 resource_name = name.split(".", 1)[0]
1811 kind, namespace, update = app.ir.k8s_status_updates[name]
1812 text = dump_json(update)
1813
1814 # self.logger.debug(f"K8s status update: {kind} {resource_name}.{namespace}, {text}...")
1815
1816 app.kubestatus.post(kind, resource_name, namespace, text)
1817
1818 group_count = len(app.ir.groups)
1819 cluster_count = len(app.ir.clusters)
1820 listener_count = len(app.ir.listeners)
1821 service_count = len(app.ir.services)
1822
1823 self._respond(
1824 rqueue, 200, "configuration updated (%s) from snapshot %s" % (config_type, snapshot)
1825 )
1826
1827 self.logger.info(
1828 "configuration updated (%s) from snapshot %s (S%d L%d G%d C%d)"
1829 % (config_type, snapshot, service_count, listener_count, group_count, cluster_count)
1830 )
1831
1832 # Remember that we've reconfigured.
1833 self.app.reconf_stats.mark(config_type)
1834
1835 if app.health_checks and not app.stats_updater:
1836 app.logger.debug("starting Envoy status updater")
1837 app.stats_updater = PeriodicTrigger(app.watcher.update_estats, period=5)
1838
1839 # Check our environment...
1840 self.check_environment()
1841
1842 self.chime()
1843
1844 def chime(self):
1845 # In general, our reports here should be action "update", and they should honor the
1846 # Scout cache, but we need to tweak that depending on whether we've done this before
1847 # and on whether the environment looks OK.
1848
1849 already_chimed = bool_fmt(self.chimed)
1850 was_ok = bool_fmt(self.last_chime)
1851 now_ok = bool_fmt(self.env_good)
1852
1853 # Poor man's state machine...
1854 action_key = f"{already_chimed}-{was_ok}-{now_ok}"
1855 action, no_cache = AmbassadorEventWatcher.Actions[action_key]
1856
1857 self.logger.debug(f"CHIME: {action_key}")
1858
1859 chime_args = {"no_cache": no_cache, "failures": self.failure_list}
1860
1861 if self.app.report_action_keys:
1862 chime_args["action_key"] = action_key
1863
1864 # Don't use app.check_scout; it will deadlock.
1865 self.check_scout(action, **chime_args)
1866
1867 # Remember that we have now chimed...
1868 self.chimed = True
1869
1870 # ...and remember what we sent for that chime.
1871 self.last_chime = self.env_good
1872
1873 def check_environment(self, ir: Optional[IR] = None) -> None:
1874 env_good = True
1875 chime_failures = {}
1876 env_status = SystemStatus()
1877
1878 error_count = 0
1879 tls_count = 0
1880 mapping_count = 0
1881
1882 if not ir:
1883 ir = app.ir
1884
1885 if not ir:
1886 chime_failures["no config loaded"] = True
1887 env_good = False
1888 else:
1889 if not ir.aconf:
1890 chime_failures["completely empty config"] = True
1891 env_good = False
1892 else:
1893 for err_key, err_list in ir.aconf.errors.items():
1894 if err_key == "-global-":
1895 err_key = ""
1896
1897 for err in err_list:
1898 error_count += 1
1899 err_text = err["error"]
1900
1901 self.app.logger.info(f"error {err_key} {err_text}")
1902
1903 if err_text.find("CRD") >= 0:
1904 if err_text.find("core") >= 0:
1905 chime_failures["core CRDs"] = True
1906 env_status.failure("CRDs", "Core CRD type definitions are missing")
1907 else:
1908 chime_failures["other CRDs"] = True
1909 env_status.failure(
1910 "CRDs", "Resolver CRD type definitions are missing"
1911 )
1912
1913 env_good = False
1914 elif err_text.find("TLS") >= 0:
1915 chime_failures["TLS errors"] = True
1916 env_status.failure("TLS", err_text)
1917
1918 env_good = False
1919
1920 for context in ir.tls_contexts:
1921 if context:
1922 tls_count += 1
1923 break
1924
1925 for group in ir.groups.values():
1926 for mapping in group.mappings:
1927 pfx = mapping.get("prefix", None)
1928 name = mapping.get("name", None)
1929
1930 if pfx:
1931 if not pfx.startswith("/ambassador/v0") or not name.startswith("internal_"):
1932 mapping_count += 1
1933
1934 if error_count:
1935 env_status.failure(
1936 "Error check",
1937 f'{error_count} total error{"" if (error_count == 1) else "s"} logged',
1938 )
1939 env_good = False
1940 else:
1941 env_status.OK("Error check", "No errors logged")
1942
1943 if tls_count:
1944 env_status.OK(
1945 "TLS", f'{tls_count} TLSContext{" is" if (tls_count == 1) else "s are"} active'
1946 )
1947 else:
1948 chime_failures["no TLS contexts"] = True
1949 env_status.failure("TLS", "No TLSContexts are active")
1950
1951 env_good = False
1952
1953 if mapping_count:
1954 env_status.OK(
1955 "Mappings",
1956 f'{mapping_count} Mapping{" is" if (mapping_count == 1) else "s are"} active',
1957 )
1958 else:
1959 chime_failures["no Mappings"] = True
1960 env_status.failure("Mappings", "No Mappings are active")
1961 env_good = False
1962
1963 failure_list: List[str] = []
1964
1965 if not env_good:
1966 failure_list = list(sorted(chime_failures.keys()))
1967
1968 self.env_good = env_good
1969 self.env_status = env_status
1970 self.failure_list = failure_list
1971
1972 def check_scout(
1973 self,
1974 what: str,
1975 no_cache: Optional[bool] = False,
1976 ir: Optional[IR] = None,
1977 failures: Optional[List[str]] = None,
1978 action_key: Optional[str] = None,
1979 ) -> None:
1980 now = datetime.datetime.now()
1981 uptime = now - boot_time
1982 hr_uptime = td_format(uptime)
1983
1984 if not ir:
1985 ir = app.ir
1986
1987 self.app.notices.reset()
1988
1989 scout_args = {"uptime": int(uptime.total_seconds()), "hr_uptime": hr_uptime}
1990
1991 if failures:
1992 scout_args["failures"] = failures
1993
1994 if action_key:
1995 scout_args["action_key"] = action_key
1996
1997 if ir:
1998 self.app.logger.debug("check_scout: we have an IR")
1999
2000 if not os.environ.get("AMBASSADOR_DISABLE_FEATURES", None):
2001 self.app.logger.debug("check_scout: including features")
2002 feat = ir.features()
2003
2004 # Include features about the cache and incremental reconfiguration,
2005 # too.
2006
2007 if self.app.cache is not None:
2008 # Fast reconfigure is on. Supply the real info.
2009 feat["frc_enabled"] = True
2010 feat["frc_cache_hits"] = self.app.cache.hits
2011 feat["frc_cache_misses"] = self.app.cache.misses
2012 feat["frc_inv_calls"] = self.app.cache.invalidate_calls
2013 feat["frc_inv_objects"] = self.app.cache.invalidated_objects
2014 else:
2015 # Fast reconfigure is off.
2016 feat["frc_enabled"] = False
2017
2018 # Whether the cache is on or off, we can talk about reconfigurations.
2019 feat["frc_incr_count"] = self.app.reconf_stats.counts["incremental"]
2020 feat["frc_complete_count"] = self.app.reconf_stats.counts["complete"]
2021 feat["frc_check_count"] = self.app.reconf_stats.checks
2022 feat["frc_check_errors"] = self.app.reconf_stats.errors
2023
2024 request_data = app.estatsmgr.get_stats().requests
2025
2026 if request_data:
2027 self.app.logger.debug("check_scout: including requests")
2028
2029 for rkey in request_data.keys():
2030 cur = request_data[rkey]
2031 prev = app.last_request_info.get(rkey, 0)
2032 feat[f"request_{rkey}_count"] = max(cur - prev, 0)
2033
2034 lrt = app.last_request_time or boot_time
2035 since_lrt = now - lrt
2036 elapsed = since_lrt.total_seconds()
2037 hr_elapsed = td_format(since_lrt)
2038
2039 app.last_request_time = now
2040 app.last_request_info = request_data
2041
2042 feat["request_elapsed"] = elapsed
2043 feat["request_hr_elapsed"] = hr_elapsed
2044
2045 scout_args["features"] = feat
2046
2047 scout_result = self.app.scout.report(
2048 mode="diagd", action=what, no_cache=no_cache, **scout_args
2049 )
2050 scout_notices = scout_result.pop("notices", [])
2051
2052 global_loglevel = self.app.logger.getEffectiveLevel()
2053
2054 self.app.logger.debug(f"Scout section: global loglevel {global_loglevel}")
2055
2056 for notice in scout_notices:
2057 notice_level_name = notice.get("level") or "INFO"
2058 notice_level = logging.getLevelName(notice_level_name)
2059
2060 if notice_level >= global_loglevel:
2061 self.app.logger.debug(f"Scout section: include {notice}")
2062 self.app.notices.post(notice)
2063 else:
2064 self.app.logger.debug(f"Scout section: skip {notice}")
2065
2066 self.app.logger.debug("Scout reports %s" % dump_json(scout_result))
2067 self.app.logger.debug("Scout notices: %s" % dump_json(scout_notices))
2068 self.app.logger.debug("App notices after scout: %s" % dump_json(app.notices.notices))
2069
2070 def validate_envoy_config(self, ir: IR, config, retries) -> bool:
2071 if self.app.no_envoy:
2072 self.app.logger.debug("Skipping validation")
2073 return True
2074
2075 # We want to keep the original config untouched
2076 validation_config = copy.deepcopy(config)
2077
2078 # Envoy fails to validate with @type field in envoy config, so removing that
2079 validation_config.pop("@type")
2080
2081 if os.environ.get("AMBASSADOR_DEBUG_CLUSTER_CONFIG", "false").lower() == "true":
2082 vconf_clusters = validation_config["static_resources"]["clusters"]
2083
2084 if len(vconf_clusters) > 10:
2085 vconf_clusters.append(copy.deepcopy(vconf_clusters[10]))
2086
2087 # Check for cluster-name weirdness.
2088 _v2_clusters = {}
2089 _problems = []
2090
2091 for name in sorted(ir.clusters.keys()):
2092 if AmbassadorEventWatcher.reCompressed.search(name):
2093 _problems.append(f"IR pre-compressed cluster {name}")
2094
2095 for cluster in validation_config["static_resources"]["clusters"]:
2096 name = cluster["name"]
2097
2098 if name in _v2_clusters:
2099 _problems.append(f"V2 dup cluster {name}")
2100 _v2_clusters[name] = True
2101
2102 if _problems:
2103 self.logger.error("ENVOY CONFIG PROBLEMS:\n%s", "\n".join(_problems))
2104 stamp = datetime.datetime.now().isoformat()
2105
2106 bad_snapshot = open(
2107 os.path.join(app.snapshot_path, "snapshot-tmp.yaml"), "r"
2108 ).read()
2109
2110 cache_dict: Dict[str, Any] = {}
2111 cache_links: Dict[str, Any] = {}
2112
2113 if self.app.cache:
2114 for k, c in self.app.cache.cache.items():
2115 v: Any = c[0]
2116
2117 if getattr(v, "as_dict", None):
2118 v = v.as_dict()
2119
2120 cache_dict[k] = v
2121
2122 cache_links = {k: list(v) for k, v in self.app.cache.links.items()}
2123
2124 bad_dict = {
2125 "ir": ir.as_dict(),
2126 "v2": config,
2127 "validation": validation_config,
2128 "problems": _problems,
2129 "snapshot": bad_snapshot,
2130 "cache": cache_dict,
2131 "links": cache_links,
2132 }
2133
2134 bad_dict_str = dump_json(bad_dict, pretty=True)
2135 with open(os.path.join(app.snapshot_path, f"problems-{stamp}.json"), "w") as output:
2136 output.write(bad_dict_str)
2137
2138 config_json = dump_json(validation_config, pretty=True)
2139
2140 econf_validation_path = os.path.join(app.snapshot_path, "econf-tmp.json")
2141
2142 with open(econf_validation_path, "w") as output:
2143 output.write(config_json)
2144
2145 command = [
2146 "envoy",
2147 "--service-node",
2148 "test-id",
2149 "--service-cluster",
2150 ir.ambassador_nodename,
2151 "--config-path",
2152 econf_validation_path,
2153 "--mode",
2154 "validate",
2155 ]
2156 if Config.envoy_api_version == "V2":
2157 command.extend(["--bootstrap-version", "2"])
2158
2159 v_exit = 0
2160 v_encoded = "".encode("utf-8")
2161
2162 # Try to validate the Envoy config. Short circuit and fall through
2163 # immediately on concrete success or failure, and retry (up to the
2164 # limit) on timeout.
2165 #
2166 # The default timeout is 5s, but this can be overridden in the Ambassador
2167 # module.
2168
2169 amod = ir.ambassador_module
2170 timeout = amod.envoy_validation_timeout if amod else IRAmbassador.default_validation_timeout
2171
2172 # If the timeout is zero, don't do the validation.
2173 if timeout == 0:
2174 self.logger.debug("not validating Envoy configuration since timeout is 0")
2175 return True
2176
2177 self.logger.debug(f"validating Envoy configuration with timeout {timeout}")
2178
2179 for retry in range(retries):
2180 try:
2181 v_encoded = subprocess.check_output(
2182 command, stderr=subprocess.STDOUT, timeout=timeout
2183 )
2184 v_exit = 0
2185 break
2186 except subprocess.CalledProcessError as e:
2187 v_exit = e.returncode
2188 v_encoded = e.output
2189 break
2190 except subprocess.TimeoutExpired as e:
2191 v_exit = 1
2192 v_encoded = e.output or "".encode("utf-8")
2193
2194 self.logger.warn(
2195 "envoy configuration validation timed out after {} seconds{}\n{}".format(
2196 timeout,
2197 ", retrying..." if retry < retries - 1 else "",
2198 v_encoded.decode("utf-8"),
2199 )
2200 )
2201
2202 # Don't break here; continue on to the next iteration of the loop.
2203
2204 if v_exit == 0:
2205 self.logger.debug(
2206 "successfully validated the resulting envoy configuration, continuing..."
2207 )
2208 return True
2209
2210 v_str = typecast(str, v_encoded)
2211
2212 try:
2213 v_str = v_encoded.decode("utf-8")
2214 except:
2215 pass
2216
2217 self.logger.error(
2218 "{}\ncould not validate the envoy configuration above after {} retries, failed with error \n{}\n(exit code {})\nAborting update...".format(
2219 config_json, retries, v_str, v_exit
2220 )
2221 )
2222 return False
2223
2224
2225class StandaloneApplication(gunicorn.app.base.BaseApplication):
2226 def __init__(self, app, options=None):
2227 self.options = options or {}
2228 self.application = app
2229 super(StandaloneApplication, self).__init__()
2230
2231 # Boot chime. This is basically the earliest point at which we can consider an Ambassador
2232 # to be "running".
2233 scout_result = self.application.scout.report(mode="boot", action="boot1", no_cache=True)
2234 self.application.logger.debug(f"BOOT: Scout result {dump_json(scout_result)}")
2235 self.application.logger.info(f"Ambassador {__version__} booted")
2236
2237 def load_config(self):
2238 config = dict(
2239 [
2240 (key, value)
2241 for key, value in self.options.items()
2242 if key in self.cfg.settings and value is not None
2243 ]
2244 )
2245
2246 for key, value in config.items():
2247 self.cfg.set(key.lower(), value)
2248
2249 def load(self):
2250 # This is a little weird, but whatever.
2251 self.application.watcher = AmbassadorEventWatcher(self.application)
2252 self.application.watcher.start()
2253
2254 if self.application.config_path:
2255 self.application.watcher.post("CONFIG_FS", self.application.config_path)
2256
2257 return self.application
2258
2259
2260@click.command()
2261@click.argument("snapshot-path", type=click.Path(), required=False)
2262@click.argument("bootstrap-path", type=click.Path(), required=False)
2263@click.argument("ads-path", type=click.Path(), required=False)
2264@click.option(
2265 "--config-path",
2266 type=click.Path(),
2267 help="Optional configuration path to scan for Ambassador YAML files",
2268)
2269@click.option(
2270 "--k8s",
2271 is_flag=True,
2272 help="If True, assume config_path contains Kubernetes resources (only relevant with config_path)",
2273)
2274@click.option(
2275 "--ambex-pid",
2276 type=int,
2277 default=0,
2278 help="Optional PID to signal with HUP after updating Envoy configuration",
2279 show_default=True,
2280)
2281@click.option("--kick", type=str, help="Optional command to run after updating Envoy configuration")
2282@click.option(
2283 "--banner-endpoint",
2284 type=str,
2285 default="http://127.0.0.1:8500/banner",
2286 help="Optional endpoint of extra banner to include",
2287 show_default=True,
2288)
2289@click.option(
2290 "--metrics-endpoint",
2291 type=str,
2292 default="http://127.0.0.1:8500/metrics",
2293 help="Optional endpoint of extra prometheus metrics to include",
2294 show_default=True,
2295)
2296@click.option("--no-checks", is_flag=True, help="If True, don't do Envoy-cluster health checking")
2297@click.option("--no-envoy", is_flag=True, help="If True, don't interact with Envoy at all")
2298@click.option("--reload", is_flag=True, help="If True, run Flask in debug mode for live reloading")
2299@click.option("--debug", is_flag=True, help="If True, do debug logging")
2300@click.option(
2301 "--dev-magic",
2302 is_flag=True,
2303 help="If True, override a bunch of things for Datawire dev-loop stuff",
2304)
2305@click.option("--verbose", is_flag=True, help="If True, do really verbose debug logging")
2306@click.option(
2307 "--workers", type=int, help="Number of workers; default is based on the number of CPUs present"
2308)
2309@click.option("--host", type=str, help="Interface on which to listen")
2310@click.option("--port", type=int, default=-1, help="Port on which to listen", show_default=True)
2311@click.option("--notices", type=click.Path(), help="Optional file to read for local notices")
2312@click.option(
2313 "--validation-retries",
2314 type=int,
2315 default=5,
2316 help="Number of times to retry Envoy configuration validation after a timeout",
2317 show_default=True,
2318)
2319@click.option(
2320 "--allow-fs-commands",
2321 is_flag=True,
2322 help="If true, allow CONFIG_FS to support debug/testing commands",
2323)
2324@click.option(
2325 "--local-scout",
2326 is_flag=True,
2327 help="Don't talk to remote Scout at all; keep everything purely local",
2328)
2329@click.option("--report-action-keys", is_flag=True, help="Report action keys when chiming")
2330def main(
2331 snapshot_path=None,
2332 bootstrap_path=None,
2333 ads_path=None,
2334 *,
2335 dev_magic=False,
2336 config_path=None,
2337 ambex_pid=0,
2338 kick=None,
2339 banner_endpoint="http://127.0.0.1:8500/banner",
2340 metrics_endpoint="http://127.0.0.1:8500/metrics",
2341 k8s=False,
2342 no_checks=False,
2343 no_envoy=False,
2344 reload=False,
2345 debug=False,
2346 verbose=False,
2347 workers=None,
2348 port=-1,
2349 host="",
2350 notices=None,
2351 validation_retries=5,
2352 allow_fs_commands=False,
2353 local_scout=False,
2354 report_action_keys=False,
2355):
2356 """
2357 Run the diagnostic daemon.
2358
2359 Arguments:
2360
2361 SNAPSHOT_PATH
2362 Path to directory in which to save configuration snapshots and dynamic secrets
2363
2364 BOOTSTRAP_PATH
2365 Path to which to write bootstrap Envoy configuration
2366
2367 ADS_PATH
2368 Path to which to write ADS Envoy configuration
2369 """
2370
2371 enable_fast_reconfigure = parse_bool(os.environ.get("AMBASSADOR_FAST_RECONFIGURE", "true"))
2372
2373 if port < 0:
2374 port = Constants.DIAG_PORT if not enable_fast_reconfigure else Constants.DIAG_PORT_ALT
2375 # port = Constants.DIAG_PORT
2376
2377 if not host:
2378 host = "0.0.0.0" if not enable_fast_reconfigure else "127.0.0.1"
2379
2380 if dev_magic:
2381 # Override the world.
2382 os.environ["SCOUT_HOST"] = "127.0.0.1:9999"
2383 os.environ["SCOUT_HTTPS"] = "no"
2384
2385 no_checks = True
2386 no_envoy = True
2387
2388 os.makedirs("/tmp/snapshots", mode=0o755, exist_ok=True)
2389
2390 snapshot_path = "/tmp/snapshots"
2391 bootstrap_path = "/tmp/boot.json"
2392 ads_path = "/tmp/ads.json"
2393
2394 port = 9998
2395
2396 allow_fs_commands = True
2397 local_scout = True
2398 report_action_keys = True
2399
2400 if no_envoy:
2401 no_checks = True
2402
2403 # Create the application itself.
2404 app.setup(
2405 snapshot_path,
2406 bootstrap_path,
2407 ads_path,
2408 config_path,
2409 ambex_pid,
2410 kick,
2411 banner_endpoint,
2412 metrics_endpoint,
2413 k8s,
2414 not no_checks,
2415 no_envoy,
2416 reload,
2417 debug,
2418 verbose,
2419 notices,
2420 validation_retries,
2421 allow_fs_commands,
2422 local_scout,
2423 report_action_keys,
2424 enable_fast_reconfigure,
2425 )
2426
2427 if not workers:
2428 workers = number_of_workers()
2429
2430 gunicorn_config = {
2431 "bind": "%s:%s" % (host, port),
2432 # 'workers': 1,
2433 "threads": workers,
2434 }
2435
2436 app.logger.info(
2437 "thread count %d, listening on %s" % (gunicorn_config["threads"], gunicorn_config["bind"])
2438 )
2439
2440 StandaloneApplication(app, gunicorn_config).run()
2441
2442
2443if __name__ == "__main__":
2444 main()
View as plain text