1# Copyright 2018 Datawire. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License
14
15import logging
16import threading
17import time
18from dataclasses import dataclass
19from dataclasses import field as dc_field
20from typing import Any, Callable, Dict, List, Optional, Union
21
22import requests
23
24
25def percentage(x: float, y: float) -> int:
26 if y == 0:
27 return 0
28 else:
29 return int(((x * 100) / y) + 0.5)
30
31
32@dataclass(frozen=True)
33class EnvoyStats:
34 max_live_age: int = 120
35 max_ready_age: int = 120
36 created: float = 0.0
37 last_update: Optional[float] = None
38 last_attempt: Optional[float] = None
39 update_errors: int = 0
40
41 # Yes yes yes I know -- the contents of these dicts are not immutable.
42 # That's OK for now, but realize that you mustn't go munging around altering
43 # things in here once they're assigned!
44 requests: Dict[str, Any] = dc_field(default_factory=dict)
45 clusters: Dict[str, Any] = dc_field(default_factory=dict)
46 envoy: Dict[str, Any] = dc_field(default_factory=dict)
47
48 def is_alive(self) -> bool:
49 """
50 Make sure we've heard from Envoy within max_live_age seconds.
51
52 If we haven't yet heard from Envoy at all (we've just booted),
53 consider Envoy alive if we haven't yet been running for max_live_age
54 seconds -- basically, Envoy gets a grace period to start running at
55 boot time.
56 """
57
58 epoch = self.last_update
59
60 if not epoch:
61 epoch = self.created
62
63 return (time.time() - epoch) <= self.max_live_age
64
65 def is_ready(self) -> bool:
66 """
67 Make sure we've heard from Envoy within max_ready_age seconds.
68
69 If we haven't yet heard from Envoy at all (we've just booted),
70 then Envoy is not yet ready, and is_ready() returns False.
71 """
72
73 epoch = self.last_update
74
75 if not epoch:
76 return False
77
78 return (time.time() - epoch) <= self.max_ready_age
79
80 def time_since_boot(self) -> float:
81 """Return the number of seconds since Envoy booted."""
82 return time.time() - self.created
83
84 def time_since_update(self) -> Optional[float]:
85 """
86 Return the number of seconds since we last heard from Envoy, or None if
87 we've never heard from Envoy.
88 """
89
90 if not self.last_update:
91 return None
92 else:
93 return time.time() - self.last_update
94
95 def cluster_stats(self, name: str) -> Dict[str, Union[str, bool]]:
96 if not self.last_update:
97 # No updates.
98 return {
99 "valid": False,
100 "reason": "No stats updates have succeeded",
101 "health": "no stats yet",
102 "hmetric": "startup",
103 "hcolor": "grey",
104 }
105
106 # OK, we should be OK.
107 when = self.last_update
108 cstat = self.clusters
109
110 if name not in cstat:
111 return {
112 "valid": False,
113 "reason": "Cluster %s is not defined" % name,
114 "health": "undefined cluster",
115 "hmetric": "undefined cluster",
116 "hcolor": "orange",
117 }
118
119 cstat = dict(**cstat[name])
120 cstat.update({"valid": True, "reason": "Cluster %s updated at %d" % (name, when)})
121
122 pct = cstat.get("healthy_percent", None)
123
124 if pct != None:
125 color = "green"
126
127 if pct < 70:
128 color = "red"
129 elif pct < 90:
130 color = "yellow"
131
132 cstat.update({"health": "%d%% healthy" % pct, "hmetric": str(pct), "hcolor": color})
133 else:
134 cstat.update(
135 {
136 "health": "Unknown health: no requests yet",
137 "hmetric": "waiting",
138 "hcolor": "grey",
139 }
140 )
141
142 return cstat
143
144
145LogLevelFetcher = Callable[[Optional[str]], Optional[str]]
146EnvoyStatsFetcher = Callable[[], Optional[str]]
147
148
149class EnvoyStatsMgr:
150 # fetch_log_levels and fetch_envoy_stats are debugging hooks
151 def __init__(
152 self,
153 logger: logging.Logger,
154 max_live_age: int = 120,
155 max_ready_age: int = 120,
156 fetch_log_levels: Optional[LogLevelFetcher] = None,
157 fetch_envoy_stats: Optional[EnvoyStatsFetcher] = None,
158 ) -> None:
159 self.logger = logger
160 self.loginfo: Dict[str, Union[str, List[str]]] = {}
161
162 self.update_lock = threading.Lock()
163 self.access_lock = threading.Lock()
164
165 self.fetch_log_levels = fetch_log_levels or self._fetch_log_levels
166 self.fetch_envoy_stats = fetch_envoy_stats or self._fetch_envoy_stats
167
168 self.stats = EnvoyStats(
169 created=time.time(), max_live_age=max_live_age, max_ready_age=max_ready_age
170 )
171
172 def _fetch_log_levels(self, level: Optional[str]) -> Optional[str]:
173 try:
174 url = "http://127.0.0.1:8001/logging"
175
176 if level:
177 url += "?level=%s" % level
178
179 r = requests.post(url)
180
181 # OMFG. Querying log levels returns with a 404 code.
182 if (r.status_code != 200) and (r.status_code != 404):
183 self.logger.warning("EnvoyStats.update_log_levels failed: %s" % r.text)
184 return None
185
186 return r.text
187 except Exception as e:
188 self.logger.warning("EnvoyStats.update_log_levels failed: %s" % e)
189 return None
190
191 def _fetch_envoy_stats(self) -> Optional[str]:
192 try:
193 r = requests.get("http://127.0.0.1:8001/stats")
194
195 if r.status_code != 200:
196 self.logger.warning("EnvoyStats.update failed: %s" % r.text)
197 return None
198
199 return r.text
200 except OSError as e:
201 self.logger.warning("EnvoyStats.update failed: %s" % e)
202 return None
203
204 def update_log_levels(self, last_attempt: float, level: Optional[str] = None) -> bool:
205 """
206 Heavy lifting around updating the Envoy log levels.
207
208 You MUST hold the update lock when calling this method.
209 You MUST NOT hold the access lock when calling this method.
210
211 update_log_levels does all the work of talking to Envoy and computing
212 new stats, then grabs the access_lock just long enough to update the data
213 structures for others to look at.
214 """
215
216 text = self.fetch_log_levels(level)
217
218 if not text:
219 # Ew.
220 with self.access_lock:
221 # EnvoyStats is immutable, so...
222 new_stats = EnvoyStats(
223 max_live_age=self.stats.max_live_age,
224 max_ready_age=self.stats.max_ready_age,
225 created=self.stats.created,
226 last_update=self.stats.last_update,
227 last_attempt=last_attempt, # THIS IS A CHANGE
228 update_errors=self.stats.update_errors + 1, # THIS IS A CHANGE
229 requests=self.stats.requests,
230 clusters=self.stats.clusters,
231 envoy=self.stats.envoy,
232 )
233
234 self.stats = new_stats
235
236 return False
237
238 levels: Dict[str, Dict[str, bool]] = {}
239
240 for line in text.split("\n"):
241 if not line:
242 continue
243
244 if line.startswith(" "):
245 (logtype, level) = line[2:].split(": ")
246
247 x = levels.setdefault(level, {})
248 x[logtype] = True
249
250 loginfo: Dict[str, Union[str, List[str]]]
251
252 if len(levels.keys()) == 1:
253 loginfo = {"all": list(levels.keys())[0]}
254 else:
255 loginfo = {x: list(levels[x].keys()) for x in levels.keys()}
256
257 with self.access_lock:
258 self.loginfo = loginfo
259 return True
260
261 def get_stats(self) -> EnvoyStats:
262 """
263 Get the current Envoy stats object, safely.
264
265 You MUST NOT hold the access_lock when calling this method.
266 """
267
268 with self.access_lock:
269 return self.stats
270
271 def get_prometheus_stats(self) -> str:
272 try:
273 r = requests.get("http://127.0.0.1:8001/stats/prometheus")
274 except OSError as e:
275 self.logger.warning("EnvoyStats.get_prometheus_state failed: %s" % e)
276 return ""
277
278 if r.status_code != 200:
279 self.logger.warning("EnvoyStats.get_prometheus_state failed: %s" % r.text)
280 return ""
281 return r.text
282
283 def update_envoy_stats(self, last_attempt: float) -> None:
284 """
285 Heavy lifting around updating the Envoy stats.
286
287 You MUST hold the update lock when calling this method.
288 You MUST NOT hold the access lock when calling this method.
289
290 update_envoy_stats does all the work of talking to Envoy and computing
291 new stats, then grabs the access_lock just long enough to update the data
292 structures for others to look at.
293 """
294
295 text = self.fetch_envoy_stats()
296
297 if not text:
298 # EnvoyStats is immutable, so...
299 new_stats = EnvoyStats(
300 max_live_age=self.stats.max_live_age,
301 max_ready_age=self.stats.max_ready_age,
302 created=self.stats.created,
303 last_update=self.stats.last_update,
304 last_attempt=last_attempt, # THIS IS A CHANGE
305 update_errors=self.stats.update_errors + 1, # THIS IS A CHANGE
306 requests=self.stats.requests,
307 clusters=self.stats.clusters,
308 envoy=self.stats.envoy,
309 )
310
311 with self.access_lock:
312 self.stats = new_stats
313 return
314
315 # Parse stats into a hierarchy.
316 envoy_stats: Dict[str, Any] = {} # Ew.
317
318 for line in text.split("\n"):
319 if not line:
320 continue
321
322 # TODO: Splitting from the right is a work-around for the
323 # following issue: https://github.com/emissary-ingress/emissary/issues/4528
324 # and needs to be addressed via a behavior change
325 key, value = line.rsplit(":", 1)
326 keypath = key.split(".")
327
328 node = envoy_stats
329
330 for key in keypath[:-1]:
331 if key not in node:
332 node[key] = {}
333
334 node = node[key]
335
336 value = value.strip()
337
338 try:
339 node[keypath[-1]] = int(value)
340 except:
341 continue
342
343 # Now dig into clusters a bit more.
344
345 requests_info = {}
346 active_clusters = {}
347
348 if ("http" in envoy_stats) and ("ingress_http" in envoy_stats["http"]):
349 ingress_stats = envoy_stats["http"]["ingress_http"]
350
351 requests_total = ingress_stats.get("downstream_rq_total", 0)
352
353 requests_4xx = ingress_stats.get("downstream_rq_4xx", 0)
354 requests_5xx = ingress_stats.get("downstream_rq_5xx", 0)
355 requests_bad = requests_4xx + requests_5xx
356
357 requests_ok = requests_total - requests_bad
358
359 requests_info = {
360 "total": requests_total,
361 "4xx": requests_4xx,
362 "5xx": requests_5xx,
363 "bad": requests_bad,
364 "ok": requests_ok,
365 }
366
367 if "cluster" in envoy_stats:
368 for cluster_name in envoy_stats["cluster"]:
369 cluster = envoy_stats["cluster"][cluster_name]
370
371 healthy_percent: Optional[int]
372
373 healthy_members = cluster["membership_healthy"]
374 total_members = cluster["membership_total"]
375 healthy_percent = percentage(healthy_members, total_members)
376
377 update_attempts = cluster["update_attempt"]
378 update_successes = cluster["update_success"]
379 update_percent = percentage(update_successes, update_attempts)
380
381 upstream_total = cluster.get("upstream_rq_completed", 0)
382
383 upstream_4xx = cluster.get("upstream_rq_4xx", 0)
384 upstream_5xx = cluster.get("upstream_rq_5xx", 0)
385 upstream_bad = upstream_5xx # used to include 4XX here, but that seems wrong.
386
387 upstream_ok = upstream_total - upstream_bad
388
389 if upstream_total > 0:
390 healthy_percent = percentage(upstream_ok, upstream_total)
391 else:
392 healthy_percent = None
393
394 active_clusters[cluster_name] = {
395 "healthy_members": healthy_members,
396 "total_members": total_members,
397 "healthy_percent": healthy_percent,
398 "update_attempts": update_attempts,
399 "update_successes": update_successes,
400 "update_percent": update_percent,
401 "upstream_ok": upstream_ok,
402 "upstream_4xx": upstream_4xx,
403 "upstream_5xx": upstream_5xx,
404 "upstream_bad": upstream_bad,
405 }
406
407 # OK, we're now officially finished with all the hard stuff.
408 last_update = time.time()
409
410 # Finally, set up the new EnvoyStats.
411 new_stats = EnvoyStats(
412 max_live_age=self.stats.max_live_age,
413 max_ready_age=self.stats.max_ready_age,
414 created=self.stats.created,
415 last_update=last_update, # THIS IS A CHANGE
416 last_attempt=last_attempt, # THIS IS A CHANGE
417 update_errors=self.stats.update_errors,
418 requests=requests_info, # THIS IS A CHANGE
419 clusters=active_clusters, # THIS IS A CHANGE
420 envoy=envoy_stats, # THIS IS A CHANGE
421 )
422
423 # Make sure we hold the access_lock while messing with self.stats!
424 with self.access_lock:
425 self.stats = new_stats
426
427 def update(self) -> None:
428 """
429 Update the Envoy stats object, including our take on Envoy's loglevel and
430 lower-level statistics.
431
432 You MUST NOT hold the update lock when calling this method.
433 You MUST NOT hold the access lock when calling this method.
434
435 The first thing that update_envoy_stats does is to acquire the update_lock.
436 If it cannot do so immediately, it assumes that another update is already
437 running, and returns without doing anything further.
438
439 update_envoy_stats uses update_log_levels and update_envoy_stats to do all
440 the heavy lifting around talking to Envoy, managing the access_lock, and
441 actually writing new data into the Envoy stats object.
442 """
443
444 # First up, try bailing early.
445 if not self.update_lock.acquire(blocking=False):
446 self.logger.warning("EStats update: skipping due to lock contention")
447 return
448
449 # If here, we have the lock. Make sure it gets released!
450 try:
451 # Remember when we started.
452 last_attempt = time.time()
453
454 self.update_log_levels(last_attempt)
455 self.update_envoy_stats(last_attempt)
456 except Exception as e:
457 self.logger.exception("could not update Envoy stats: %s" % e)
458 finally:
459 self.update_lock.release()
View as plain text