1import logging
2import os
3import sys
4import threading
5import time
6from typing import Optional
7
8import pytest
9
10logging.basicConfig(
11 level=logging.INFO,
12 format="%(asctime)s test %(levelname)s: %(message)s",
13 datefmt="%Y-%m-%d %H:%M:%S",
14)
15
16logger = logging.getLogger("ambassador")
17
18from ambassador.diagnostics import EnvoyStatsMgr
19
20
21class EnvoyStatsMocker:
22 def __init__(self) -> None:
23 self.test_dir = os.path.join(
24 os.path.dirname(os.path.abspath(__file__)),
25 "testdata",
26 )
27
28 self.log_idx = 0
29 self.stats_idx = 0
30
31 def fetch_log_levels(self, level: Optional[str]) -> Optional[str]:
32 self.log_idx += 1
33 path = os.path.join(self.test_dir, f"logging-{self.log_idx}.txt")
34
35 try:
36 return open(path, "r").read()
37 except OSError:
38 return None
39
40 def fetch_envoy_stats(self) -> Optional[str]:
41 self.stats_idx += 1
42 path = os.path.join(self.test_dir, f"stats-{self.stats_idx}.txt")
43
44 try:
45 return open(path, "r").read()
46 except OSError:
47 return None
48
49 def slow_fetch_stats(self) -> Optional[str]:
50 time.sleep(5)
51 return self.fetch_envoy_stats()
52
53
54def test_levels():
55 mocker = EnvoyStatsMocker()
56
57 esm = EnvoyStatsMgr(
58 logger, fetch_log_levels=mocker.fetch_log_levels, fetch_envoy_stats=mocker.fetch_envoy_stats
59 )
60
61 esm.update()
62 assert esm.loginfo == {"all": "error"}
63
64 # This one may be a bit more fragile than we'd like.
65 esm.update()
66 assert esm.loginfo == {
67 "error": [
68 "admin",
69 "aws",
70 "assert",
71 "backtrace",
72 "cache_filter",
73 "client",
74 "config",
75 "connection",
76 "conn_handler",
77 "decompression",
78 "envoy_bug",
79 "ext_authz",
80 "rocketmq",
81 "file",
82 "filter",
83 "forward_proxy",
84 "grpc",
85 "hc",
86 "health_checker",
87 "http",
88 "http2",
89 "hystrix",
90 "init",
91 "io",
92 "jwt",
93 "kafka",
94 "main",
95 "misc",
96 "mongo",
97 "quic",
98 "quic_stream",
99 "pool",
100 "rbac",
101 "redis",
102 "router",
103 "runtime",
104 "stats",
105 "secret",
106 "tap",
107 "testing",
108 "thrift",
109 "tracing",
110 "upstream",
111 "udp",
112 "wasm",
113 ],
114 "info": ["dubbo"],
115 "warning": ["lua"],
116 }
117
118
119def test_stats():
120 mocker = EnvoyStatsMocker()
121
122 esm = EnvoyStatsMgr(
123 logger, fetch_log_levels=mocker.fetch_log_levels, fetch_envoy_stats=mocker.fetch_envoy_stats
124 )
125
126 esm.update()
127 stats = esm.get_stats()
128
129 assert stats.last_attempt >= stats.created
130 assert stats.last_update > stats.last_attempt
131 assert stats.update_errors == 0
132
133 assert stats.requests == {"total": 19, "4xx": 19, "5xx": 0, "bad": 19, "ok": 0}
134 assert stats.clusters["cluster_127_0_0_1_8500_ambassador"] == {
135 "healthy_members": 1,
136 "total_members": 1,
137 "healthy_percent": 100,
138 "update_attempts": 4220,
139 "update_successes": 4220,
140 "update_percent": 100,
141 "upstream_ok": 14,
142 "upstream_4xx": 14,
143 "upstream_5xx": 0,
144 "upstream_bad": 0,
145 }
146
147 assert stats.clusters["cluster_identity_api_jennifer_testing_sv-0"] == {
148 "healthy_members": 1,
149 "total_members": 1,
150 "healthy_percent": None,
151 "update_attempts": 4216,
152 "update_successes": 4216,
153 "update_percent": 100,
154 "upstream_ok": 0,
155 "upstream_4xx": 0,
156 "upstream_5xx": 0,
157 "upstream_bad": 0,
158 }
159
160 assert stats.envoy["cluster_manager"] == {
161 "active_clusters": 336,
162 "cds": {
163 "init_fetch_timeout": 0,
164 "update_attempt": 15,
165 "update_failure": 0,
166 "update_rejected": 0,
167 "update_success": 14,
168 "update_time": 1602023101467,
169 "version": 11975404232982186540,
170 },
171 "cluster_added": 336,
172 "cluster_modified": 0,
173 "cluster_removed": 0,
174 "cluster_updated": 0,
175 "cluster_updated_via_merge": 0,
176 "update_merge_cancelled": 0,
177 "update_out_of_merge_window": 0,
178 "warming_clusters": 0,
179 }
180
181 assert stats.envoy["control_plane"] == {
182 "connected_state": 1,
183 "pending_requests": 0,
184 "rate_limit_enforced": 0,
185 }
186
187 assert stats.envoy["listener_manager"] == {
188 "lds": {
189 "init_fetch_timeout": 0,
190 "update_attempt": 32,
191 "update_failure": 0,
192 "update_rejected": 17,
193 "update_success": 14,
194 "update_time": 1602023102107,
195 "version": 11975404232982186540,
196 },
197 "listener_added": 2,
198 "listener_create_failure": 0,
199 "listener_create_success": 8,
200 "listener_in_place_updated": 0,
201 "listener_modified": 0,
202 "listener_removed": 0,
203 "listener_stopped": 0,
204 "total_filter_chains_draining": 0,
205 "total_listeners_active": 2,
206 "total_listeners_draining": 0,
207 "total_listeners_warming": 0,
208 "workers_started": 1,
209 }
210
211 esm.update()
212 stats2 = esm.get_stats()
213
214 assert id(stats) != id(stats2)
215
216
217def test_locks():
218 mocker = EnvoyStatsMocker()
219
220 esm = EnvoyStatsMgr(
221 logger,
222 max_live_age=3,
223 max_ready_age=3,
224 fetch_log_levels=mocker.fetch_log_levels,
225 fetch_envoy_stats=mocker.slow_fetch_stats,
226 )
227
228 def slow_background():
229 esm.update() == True
230
231 def check_get_stats():
232 start = time.perf_counter()
233 stats = esm.get_stats()
234 end = time.perf_counter()
235
236 assert (end - start) < 0.001
237 return stats
238
239 # Start updating in the background. This will take five seconds.
240 threading.Thread(target=slow_background).start()
241
242 # At this point, we should be able to get stats very quickly, and see
243 # alive but not ready.
244 sys.stdout.write("1")
245 sys.stdout.flush()
246 stats1 = check_get_stats()
247 assert stats1.is_alive()
248 assert not stats1.is_ready()
249
250 # Wait 2 seconds. We should get the _same_ stats object, and again,
251 # alive but not ready.
252 time.sleep(2)
253 sys.stdout.write("2")
254 sys.stdout.flush()
255 stats2 = check_get_stats()
256 assert id(stats2) == id(stats1)
257 assert stats2.is_alive()
258 assert not stats2.is_ready()
259
260 # We should also see the update_lock being held.
261 assert not esm.update_lock.acquire(blocking=False)
262
263 # Wait 2 more seconds. We should get the same stats object, but it should
264 # now say neither alive nor ready.
265 time.sleep(2)
266 sys.stdout.write("3")
267 sys.stdout.flush()
268 stats3 = check_get_stats()
269 assert id(stats3) == id(stats1)
270 assert not stats3.is_alive()
271 assert not stats3.is_ready()
272
273 # Wait 2 more seconds. At this point, we should get a new stats object,
274 # and we should see alive and ready.
275 time.sleep(2)
276 sys.stdout.write("4")
277 sys.stdout.flush()
278 stats4 = check_get_stats()
279 assert id(stats4) != id(stats1)
280 assert stats4.is_alive()
281 assert stats4.is_ready()
282
283 # The update lock should not be held, either.
284 assert esm.update_lock.acquire(blocking=False)
285 esm.update_lock.release()
286
287 # Finally, if we wait four more seconds, we should still have the same
288 # stats object as last time, but we should see neither alive nor ready.
289 time.sleep(4)
290 sys.stdout.write("5")
291 sys.stdout.flush()
292 stats5 = check_get_stats()
293 assert id(stats5) == id(stats4)
294 assert not stats5.is_alive()
295 assert not stats5.is_ready()
296
297
298if __name__ == "__main__":
299 pytest.main(sys.argv)
View as plain text