1import logging
2import os
3import sys
4import threading
5import time
6from typing import Optional
7
8import pytest
9
10logging.basicConfig(
11 level=logging.INFO,
12 format="%(asctime)s test %(levelname)s: %(message)s",
13 datefmt="%Y-%m-%d %H:%M:%S",
14)
15
16logger = logging.getLogger("ambassador")
17
18from ambassador.diagnostics import EnvoyStatsMgr
19
20
21class EnvoyStatsMocker:
22 def __init__(self) -> None:
23 self.test_dir = os.path.join(
24 os.path.dirname(os.path.abspath(__file__)),
25 "testdata",
26 )
27
28 self.log_idx = 0
29 self.stats_idx = 0
30
31 def fetch_log_levels(self, level: Optional[str]) -> Optional[str]:
32 self.log_idx += 1
33 path = os.path.join(self.test_dir, f"logging-{self.log_idx}.txt")
34
35 try:
36 return open(path, "r").read()
37 except OSError:
38 return None
39
40 def fetch_envoy_stats(self) -> Optional[str]:
41 self.stats_idx += 1
42 path = os.path.join(self.test_dir, f"stats-{self.stats_idx}.txt")
43
44 try:
45 return open(path, "r").read()
46 except OSError:
47 return None
48
49 def slow_fetch_stats(self) -> Optional[str]:
50 time.sleep(5)
51 return self.fetch_envoy_stats()
52
53
54def test_levels():
55 mocker = EnvoyStatsMocker()
56
57 esm = EnvoyStatsMgr(
58 logger, fetch_log_levels=mocker.fetch_log_levels, fetch_envoy_stats=mocker.fetch_envoy_stats
59 )
60
61 esm.update()
62 assert esm.loginfo == {"all": "error"}
63
64 # This one may be a bit more fragile than we'd like.
65 esm.update()
66 assert esm.loginfo == {
67 "error": [
68 "admin",
69 "aws",
70 "assert",
71 "backtrace",
72 "cache_filter",
73 "client",
74 "config",
75 "connection",
76 "conn_handler",
77 "decompression",
78 "envoy_bug",
79 "ext_authz",
80 "rocketmq",
81 "file",
82 "filter",
83 "forward_proxy",
84 "grpc",
85 "hc",
86 "health_checker",
87 "http",
88 "http2",
89 "hystrix",
90 "init",
91 "io",
92 "jwt",
93 "kafka",
94 "main",
95 "misc",
96 "mongo",
97 "quic",
98 "quic_stream",
99 "pool",
100 "rbac",
101 "redis",
102 "router",
103 "runtime",
104 "stats",
105 "secret",
106 "tap",
107 "testing",
108 "thrift",
109 "tracing",
110 "upstream",
111 "udp",
112 "wasm",
113 ],
114 "info": ["dubbo"],
115 "warning": ["lua"],
116 }
117
118
119def test_stats():
120 mocker = EnvoyStatsMocker()
121
122 esm = EnvoyStatsMgr(
123 logger, fetch_log_levels=mocker.fetch_log_levels, fetch_envoy_stats=mocker.fetch_envoy_stats
124 )
125
126 esm.update()
127 stats = esm.get_stats()
128
129 assert stats.created is not None
130 assert stats.last_attempt is not None
131 assert stats.last_update is not None
132
133 assert stats.last_attempt >= stats.created
134 assert stats.last_update > stats.last_attempt
135 assert stats.update_errors == 0
136
137 assert stats.requests == {"total": 19, "4xx": 19, "5xx": 0, "bad": 19, "ok": 0}
138 assert stats.clusters["cluster_127_0_0_1_8500_ambassador"] == {
139 "healthy_members": 1,
140 "total_members": 1,
141 "healthy_percent": 100,
142 "update_attempts": 4220,
143 "update_successes": 4220,
144 "update_percent": 100,
145 "upstream_ok": 14,
146 "upstream_4xx": 14,
147 "upstream_5xx": 0,
148 "upstream_bad": 0,
149 }
150
151 assert stats.clusters["cluster_identity_api_jennifer_testing_sv-0"] == {
152 "healthy_members": 1,
153 "total_members": 1,
154 "healthy_percent": None,
155 "update_attempts": 4216,
156 "update_successes": 4216,
157 "update_percent": 100,
158 "upstream_ok": 0,
159 "upstream_4xx": 0,
160 "upstream_5xx": 0,
161 "upstream_bad": 0,
162 }
163
164 assert stats.envoy["cluster_manager"] == {
165 "active_clusters": 336,
166 "cds": {
167 "init_fetch_timeout": 0,
168 "update_attempt": 15,
169 "update_failure": 0,
170 "update_rejected": 0,
171 "update_success": 14,
172 "update_time": 1602023101467,
173 "version": 11975404232982186540,
174 },
175 "cluster_added": 336,
176 "cluster_modified": 0,
177 "cluster_removed": 0,
178 "cluster_updated": 0,
179 "cluster_updated_via_merge": 0,
180 "update_merge_cancelled": 0,
181 "update_out_of_merge_window": 0,
182 "warming_clusters": 0,
183 }
184
185 assert stats.envoy["control_plane"] == {
186 "connected_state": 1,
187 "pending_requests": 0,
188 "rate_limit_enforced": 0,
189 }
190
191 assert stats.envoy["listener_manager"] == {
192 "lds": {
193 "init_fetch_timeout": 0,
194 "update_attempt": 32,
195 "update_failure": 0,
196 "update_rejected": 17,
197 "update_success": 14,
198 "update_time": 1602023102107,
199 "version": 11975404232982186540,
200 },
201 "listener_added": 2,
202 "listener_create_failure": 0,
203 "listener_create_success": 8,
204 "listener_in_place_updated": 0,
205 "listener_modified": 0,
206 "listener_removed": 0,
207 "listener_stopped": 0,
208 "total_filter_chains_draining": 0,
209 "total_listeners_active": 2,
210 "total_listeners_draining": 0,
211 "total_listeners_warming": 0,
212 "workers_started": 1,
213 }
214
215 esm.update()
216 stats2 = esm.get_stats()
217
218 assert id(stats) != id(stats2)
219
220
221def test_locks():
222 mocker = EnvoyStatsMocker()
223
224 esm = EnvoyStatsMgr(
225 logger,
226 max_live_age=3,
227 max_ready_age=3,
228 fetch_log_levels=mocker.fetch_log_levels,
229 fetch_envoy_stats=mocker.slow_fetch_stats,
230 )
231
232 def slow_background():
233 esm.update()
234
235 def check_get_stats():
236 start = time.perf_counter()
237 stats = esm.get_stats()
238 end = time.perf_counter()
239
240 assert (end - start) < 0.001
241 return stats
242
243 # Start updating in the background. This will take five seconds.
244 threading.Thread(target=slow_background).start()
245
246 # At this point, we should be able to get stats very quickly, and see
247 # alive but not ready.
248 sys.stdout.write("1")
249 sys.stdout.flush()
250 stats1 = check_get_stats()
251 assert stats1.is_alive()
252 assert not stats1.is_ready()
253
254 # Wait 2 seconds. We should get the _same_ stats object, and again,
255 # alive but not ready.
256 time.sleep(2)
257 sys.stdout.write("2")
258 sys.stdout.flush()
259 stats2 = check_get_stats()
260 assert id(stats2) == id(stats1)
261 assert stats2.is_alive()
262 assert not stats2.is_ready()
263
264 # We should also see the update_lock being held.
265 assert not esm.update_lock.acquire(blocking=False)
266
267 # Wait 2 more seconds. We should get the same stats object, but it should
268 # now say neither alive nor ready.
269 time.sleep(2)
270 sys.stdout.write("3")
271 sys.stdout.flush()
272 stats3 = check_get_stats()
273 assert id(stats3) == id(stats1)
274 assert not stats3.is_alive()
275 assert not stats3.is_ready()
276
277 # Wait 2 more seconds. At this point, we should get a new stats object,
278 # and we should see alive and ready.
279 time.sleep(2)
280 sys.stdout.write("4")
281 sys.stdout.flush()
282 stats4 = check_get_stats()
283 assert id(stats4) != id(stats1)
284 assert stats4.is_alive()
285 assert stats4.is_ready()
286
287 # The update lock should not be held, either.
288 assert esm.update_lock.acquire(blocking=False)
289 esm.update_lock.release()
290
291 # Finally, if we wait four more seconds, we should still have the same
292 # stats object as last time, but we should see neither alive nor ready.
293 time.sleep(4)
294 sys.stdout.write("5")
295 sys.stdout.flush()
296 stats5 = check_get_stats()
297 assert id(stats5) == id(stats4)
298 assert not stats5.is_alive()
299 assert not stats5.is_ready()
300
301
302if __name__ == "__main__":
303 pytest.main(sys.argv)
View as plain text