import logging import os import sys import threading import time from typing import Optional import pytest logging.basicConfig( level=logging.INFO, format="%(asctime)s test %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger("ambassador") from ambassador.diagnostics import EnvoyStatsMgr class EnvoyStatsMocker: def __init__(self) -> None: self.test_dir = os.path.join( os.path.dirname(os.path.abspath(__file__)), "testdata", ) self.log_idx = 0 self.stats_idx = 0 def fetch_log_levels(self, level: Optional[str]) -> Optional[str]: self.log_idx += 1 path = os.path.join(self.test_dir, f"logging-{self.log_idx}.txt") try: return open(path, "r").read() except OSError: return None def fetch_envoy_stats(self) -> Optional[str]: self.stats_idx += 1 path = os.path.join(self.test_dir, f"stats-{self.stats_idx}.txt") try: return open(path, "r").read() except OSError: return None def slow_fetch_stats(self) -> Optional[str]: time.sleep(5) return self.fetch_envoy_stats() def test_levels(): mocker = EnvoyStatsMocker() esm = EnvoyStatsMgr( logger, fetch_log_levels=mocker.fetch_log_levels, fetch_envoy_stats=mocker.fetch_envoy_stats ) esm.update() assert esm.loginfo == {"all": "error"} # This one may be a bit more fragile than we'd like. esm.update() assert esm.loginfo == { "error": [ "admin", "aws", "assert", "backtrace", "cache_filter", "client", "config", "connection", "conn_handler", "decompression", "envoy_bug", "ext_authz", "rocketmq", "file", "filter", "forward_proxy", "grpc", "hc", "health_checker", "http", "http2", "hystrix", "init", "io", "jwt", "kafka", "main", "misc", "mongo", "quic", "quic_stream", "pool", "rbac", "redis", "router", "runtime", "stats", "secret", "tap", "testing", "thrift", "tracing", "upstream", "udp", "wasm", ], "info": ["dubbo"], "warning": ["lua"], } def test_stats(): mocker = EnvoyStatsMocker() esm = EnvoyStatsMgr( logger, fetch_log_levels=mocker.fetch_log_levels, fetch_envoy_stats=mocker.fetch_envoy_stats ) esm.update() stats = esm.get_stats() assert stats.last_attempt >= stats.created assert stats.last_update > stats.last_attempt assert stats.update_errors == 0 assert stats.requests == {"total": 19, "4xx": 19, "5xx": 0, "bad": 19, "ok": 0} assert stats.clusters["cluster_127_0_0_1_8500_ambassador"] == { "healthy_members": 1, "total_members": 1, "healthy_percent": 100, "update_attempts": 4220, "update_successes": 4220, "update_percent": 100, "upstream_ok": 14, "upstream_4xx": 14, "upstream_5xx": 0, "upstream_bad": 0, } assert stats.clusters["cluster_identity_api_jennifer_testing_sv-0"] == { "healthy_members": 1, "total_members": 1, "healthy_percent": None, "update_attempts": 4216, "update_successes": 4216, "update_percent": 100, "upstream_ok": 0, "upstream_4xx": 0, "upstream_5xx": 0, "upstream_bad": 0, } assert stats.envoy["cluster_manager"] == { "active_clusters": 336, "cds": { "init_fetch_timeout": 0, "update_attempt": 15, "update_failure": 0, "update_rejected": 0, "update_success": 14, "update_time": 1602023101467, "version": 11975404232982186540, }, "cluster_added": 336, "cluster_modified": 0, "cluster_removed": 0, "cluster_updated": 0, "cluster_updated_via_merge": 0, "update_merge_cancelled": 0, "update_out_of_merge_window": 0, "warming_clusters": 0, } assert stats.envoy["control_plane"] == { "connected_state": 1, "pending_requests": 0, "rate_limit_enforced": 0, } assert stats.envoy["listener_manager"] == { "lds": { "init_fetch_timeout": 0, "update_attempt": 32, "update_failure": 0, "update_rejected": 17, "update_success": 14, "update_time": 1602023102107, "version": 11975404232982186540, }, "listener_added": 2, "listener_create_failure": 0, "listener_create_success": 8, "listener_in_place_updated": 0, "listener_modified": 0, "listener_removed": 0, "listener_stopped": 0, "total_filter_chains_draining": 0, "total_listeners_active": 2, "total_listeners_draining": 0, "total_listeners_warming": 0, "workers_started": 1, } esm.update() stats2 = esm.get_stats() assert id(stats) != id(stats2) def test_locks(): mocker = EnvoyStatsMocker() esm = EnvoyStatsMgr( logger, max_live_age=3, max_ready_age=3, fetch_log_levels=mocker.fetch_log_levels, fetch_envoy_stats=mocker.slow_fetch_stats, ) def slow_background(): esm.update() == True def check_get_stats(): start = time.perf_counter() stats = esm.get_stats() end = time.perf_counter() assert (end - start) < 0.001 return stats # Start updating in the background. This will take five seconds. threading.Thread(target=slow_background).start() # At this point, we should be able to get stats very quickly, and see # alive but not ready. sys.stdout.write("1") sys.stdout.flush() stats1 = check_get_stats() assert stats1.is_alive() assert not stats1.is_ready() # Wait 2 seconds. We should get the _same_ stats object, and again, # alive but not ready. time.sleep(2) sys.stdout.write("2") sys.stdout.flush() stats2 = check_get_stats() assert id(stats2) == id(stats1) assert stats2.is_alive() assert not stats2.is_ready() # We should also see the update_lock being held. assert not esm.update_lock.acquire(blocking=False) # Wait 2 more seconds. We should get the same stats object, but it should # now say neither alive nor ready. time.sleep(2) sys.stdout.write("3") sys.stdout.flush() stats3 = check_get_stats() assert id(stats3) == id(stats1) assert not stats3.is_alive() assert not stats3.is_ready() # Wait 2 more seconds. At this point, we should get a new stats object, # and we should see alive and ready. time.sleep(2) sys.stdout.write("4") sys.stdout.flush() stats4 = check_get_stats() assert id(stats4) != id(stats1) assert stats4.is_alive() assert stats4.is_ready() # The update lock should not be held, either. assert esm.update_lock.acquire(blocking=False) esm.update_lock.release() # Finally, if we wait four more seconds, we should still have the same # stats object as last time, but we should see neither alive nor ready. time.sleep(4) sys.stdout.write("5") sys.stdout.flush() stats5 = check_get_stats() assert id(stats5) == id(stats4) assert not stats5.is_alive() assert not stats5.is_ready() if __name__ == "__main__": pytest.main(sys.argv)