1import datetime
2import logging
3import os
4import re
5from typing import Any, ClassVar, Dict, List, Optional, Union
6from typing import cast as typecast
7
8import semantic_version
9
10from .scout import Scout
11from .utils import dump_json, parse_bool, parse_json
12from .VERSION import Commit, Version
13
14ScoutNotice = Dict[str, str]
15
16
17class LocalScout:
18 def __init__(self, logger, app: str, version: str, install_id: str) -> None:
19 self.logger = logger
20 self.app = app
21 self.version = version
22 self.install_id = install_id
23
24 self.events: List[Dict[str, Any]] = []
25
26 self.logger.info(f"LocalScout: initialized for {app} {version}: ID {install_id}")
27
28 def report(self, **kwargs) -> dict:
29 self.events.append(kwargs)
30
31 mode = kwargs["mode"]
32 action = kwargs["action"]
33
34 now = datetime.datetime.now().timestamp()
35
36 kwargs["local_scout_timestamp"] = now
37
38 if "timestamp" not in kwargs:
39 kwargs["timestamp"] = now
40
41 self.logger.info(f"LocalScout: mode {mode}, action {action} ({kwargs})")
42
43 return {
44 "latest_version": self.version,
45 "application": self.app,
46 "cached": False,
47 "notices": [{"level": "WARNING", "message": "Using LocalScout, result is faked!"}],
48 "timestamp": now,
49 }
50
51 def reset_events(self) -> None:
52 self.events = []
53
54
55class AmbScout:
56 reTaggedBranch: ClassVar = re.compile(r"^v?(\d+\.\d+\.\d+)(-[a-zA-Z][a-zA-Z]\.\d+)?$")
57 reGitDescription: ClassVar = re.compile(r"-(\d+)-g([0-9a-f]+)$")
58
59 install_id: str
60 runtime: str
61 namespace: str
62
63 version: str
64 semver: Optional[semantic_version.Version]
65
66 _scout: Optional[Union[Scout, LocalScout]]
67 _scout_error: Optional[str]
68
69 _notices: Optional[List[ScoutNotice]]
70 _last_result: Optional[dict]
71 _last_update: Optional[datetime.datetime]
72 _update_frequency: datetime.timedelta
73
74 _latest_version: Optional[str] = None
75 _latest_semver: Optional[semantic_version.Version] = None
76
77 def __init__(
78 self, install_id=None, update_frequency=datetime.timedelta(hours=12), local_only=False
79 ) -> None:
80 if not install_id:
81 install_id = os.environ.get(
82 "AMBASSADOR_CLUSTER_ID",
83 os.environ.get("AMBASSADOR_SCOUT_ID", "00000000-0000-0000-0000-000000000000"),
84 )
85
86 self.install_id = install_id
87 self.runtime = "kubernetes" if os.environ.get("KUBERNETES_SERVICE_HOST", None) else "docker"
88 self.namespace = os.environ.get("AMBASSADOR_NAMESPACE", "default")
89
90 # Allow an environment variable to state whether we're in Edge Stack. But keep the
91 # existing condition as sufficient, so that there is less of a chance of breaking
92 # things running in a container with this file present.
93 self.is_edge_stack = parse_bool(os.environ.get("EDGE_STACK", "false")) or os.path.exists(
94 "/ambassador/.edge_stack"
95 )
96 self.app = "aes" if self.is_edge_stack else "ambassador"
97 self.version = Version
98 self.semver = self.get_semver(self.version)
99
100 self.logger = logging.getLogger("ambassador.scout")
101 # self.logger.setLevel(logging.DEBUG)
102
103 self.logger.debug("Ambassador version %s built from %s" % (Version, Commit))
104 self.logger.debug(
105 "Scout version %s%s" % (self.version, " - BAD SEMVER" if not self.semver else "")
106 )
107 self.logger.debug("Runtime %s" % self.runtime)
108 self.logger.debug("Namespace %s" % self.namespace)
109
110 self._scout = None
111 self._scout_error = None
112
113 self._local_only = local_only
114
115 self._notices = None
116 self._last_result = None
117 self._update_frequency = update_frequency
118 self._latest_version = None
119 self._latest_semver = None
120
121 self.reset_cache_time()
122
123 def reset_cache_time(self) -> None:
124 self._last_update = datetime.datetime.now() - datetime.timedelta(hours=24)
125
126 def reset_events(self) -> None:
127 if self._local_only:
128 assert self._scout
129 typecast(LocalScout, self._scout).reset_events()
130
131 def __str__(self) -> str:
132 return "%s: %s" % (
133 "OK" if self._scout else "??",
134 self._scout_error if self._scout_error else "OK",
135 )
136
137 @property
138 def scout(self) -> Optional[Union[Scout, LocalScout]]:
139 if not self._scout:
140 if self._local_only:
141 self._scout = LocalScout(
142 logger=self.logger,
143 app=self.app,
144 version=self.version,
145 install_id=self.install_id,
146 )
147 self.logger.debug("LocalScout initialized")
148 else:
149 try:
150 self._scout = Scout(
151 app=self.app, version=self.version, install_id=self.install_id
152 )
153 self._scout_error = None
154 self.logger.debug("Scout connection established")
155 except OSError as e:
156 self._scout = None
157 self._scout_error = str(e)
158 self.logger.debug(
159 "Scout connection failed, will retry later: %s" % self._scout_error
160 )
161
162 return self._scout
163
164 def report(self, force_result: Optional[dict] = None, no_cache=False, **kwargs) -> dict:
165 _notices: List[ScoutNotice] = []
166
167 # Silly, right?
168 use_cache = not no_cache
169
170 env_result = None
171
172 if use_cache:
173 env_result = os.environ.get("AMBASSADOR_SCOUT_RESULT", None)
174
175 if env_result:
176 force_result = parse_json(env_result)
177
178 result: Optional[dict] = force_result
179 result_was_cached: bool = False
180
181 if not result:
182 if "runtime" not in kwargs:
183 kwargs["runtime"] = self.runtime
184
185 if "commit" not in kwargs:
186 kwargs["commit"] = Commit
187
188 # How long since the last Scout update? If it's been more than an hour,
189 # check Scout again.
190
191 now = datetime.datetime.now()
192
193 needs_update = True
194
195 if use_cache:
196 if self._last_update:
197 since_last_update = now - typecast(datetime.datetime, self._last_update)
198 needs_update = since_last_update > self._update_frequency
199
200 if needs_update:
201 if self.scout:
202 result = self.scout.report(**kwargs)
203
204 self._last_update = now
205 self._last_result = dict(**typecast(dict, result)) if result else None
206 else:
207 result = {"scout": "unavailable: %s" % self._scout_error}
208 _notices.append(
209 {
210 "level": "DEBUG",
211 "message": "scout temporarily unavailable: %s" % self._scout_error,
212 }
213 )
214
215 # Whether we could talk to Scout or not, update the timestamp so we don't
216 # try again too soon.
217 result_timestamp = datetime.datetime.now()
218 else:
219 _notices.append({"level": "DEBUG", "message": "Returning cached result"})
220 result = dict(**typecast(dict, self._last_result)) if self._last_result else None
221 result_was_cached = True
222
223 # We can't get here unless self._last_update is set.
224 result_timestamp = typecast(datetime.datetime, self._last_update)
225 else:
226 _notices.append({"level": "INFO", "message": "Returning forced Scout result"})
227 result_timestamp = datetime.datetime.now()
228
229 if not self.semver:
230 _notices.append(
231 {
232 "level": "WARNING",
233 "message": "Ambassador has invalid version '%s'??!" % self.version,
234 }
235 )
236
237 if result:
238 result["cached"] = result_was_cached
239 else:
240 result = {"cached": False}
241
242 result["timestamp"] = result_timestamp.timestamp()
243
244 # Do version & notices stuff.
245 if "latest_version" in result:
246 latest_version = result["latest_version"]
247 latest_semver = self.get_semver(latest_version)
248
249 if latest_semver:
250 self._latest_version = latest_version
251 self._latest_semver = latest_semver
252 else:
253 _notices.append(
254 {
255 "level": "WARNING",
256 "message": "Scout returned invalid version '%s'??!" % latest_version,
257 }
258 )
259
260 if self._latest_semver and ((not self.semver) or (self._latest_semver > self.semver)):
261 _notices.append(
262 {
263 "level": "INFO",
264 "message": "Upgrade available! to Ambassador version %s" % self._latest_semver,
265 }
266 )
267
268 if "notices" in result:
269 rnotices = typecast(List[Union[str, ScoutNotice]], result["notices"])
270
271 for notice in rnotices:
272 if isinstance(notice, str):
273 _notices.append({"level": "WARNING", "message": notice})
274 elif isinstance(notice, dict):
275 lvl = notice.get("level", "WARNING").upper()
276 msg = notice.get("message", None)
277
278 if msg:
279 _notices.append({"level": lvl, "message": msg})
280 else:
281 _notices.append({"level": "WARNING", "message": dump_json(notice)})
282
283 self._notices = _notices
284
285 if self._notices:
286 result["notices"] = self._notices
287 else:
288 result.pop("notices", None)
289
290 return result
291
292 @staticmethod
293 def get_semver(version_string: str) -> Optional[semantic_version.Version]:
294 semver = None
295
296 try:
297 semver = semantic_version.Version(version_string)
298 except ValueError:
299 pass
300
301 return semver
View as plain text