1#!python
2
3# Copyright 2020 Datawire. All rights reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License
16
17import datetime
18import logging
19import time
20from typing import List, Optional, Tuple
21
22PerfCounter = float
23
24
25class ReconfigStats:
26 """
27 Track metrics for reconfigurations, whether complete or incremental.
28 There's a surprising amount of business logic in here -- read carefully
29 before messing with this!
30 """
31
32 def __init__(
33 self,
34 logger: logging.Logger,
35 max_incr_between_checks=100,
36 max_time_between_checks=600,
37 max_config_between_timers=10,
38 max_time_between_timers=120,
39 ) -> None:
40 """
41 Initialize this ReconfigStats.
42
43 :param max_incr_between_checks: Maximum number of outstanding incrementals before a sanity check
44 :param max_time_between_checks: Maximum number of seconds between sanity checks
45 :param max_config_between_timers: Maximum number of configurations before logging timers
46 :param max_time_between_timers: Maximum number of seconds between logging timers
47 """
48
49 # Save config elements.
50 self.logger = logger
51 self.max_incr_between_checks = max_incr_between_checks
52 self.max_time_between_checks = max_time_between_checks
53 self.max_config_between_timers = max_config_between_timers
54 self.max_time_between_timers = max_time_between_timers
55
56 # self.reconfigures tracks the last few reconfigures, both for business
57 # logic around the last reconfigure and for logging.
58 self.reconfigures: List[Tuple[str, PerfCounter]] = []
59
60 # self.counts tracks how many of each kind of reconfiguration have
61 # happened, for metrics.
62 self.counts = {"incremental": 0, "complete": 0}
63
64 # In many cases, the previous complete reconfigure will have fallen out
65 # of self.reconfigures, so we remember its timestamp separately.
66 self.last_complete: Optional[PerfCounter] = None
67
68 # Likewise, remember the time of the last sanity check...
69 self.last_check: Optional[PerfCounter] = None
70
71 # ...and the time of the last timer logs.
72 self.last_timer_log: Optional[PerfCounter] = None
73
74 # self.incrementals_outstanding is the number of incrementals since the
75 # last complete. Once too many incrementals pile up, we do a sanity check.
76 self.incrementals_outstanding = 0
77
78 # self.configs_outstanding is the number of configurations (either kind)
79 # since we last logged the timers. Once too many configurations pile up,
80 # we log the timers.
81 self.configs_outstanding = 0
82
83 # self.checks is how many sanity checks we've done. self.errors is how many
84 # of them failed.
85 self.checks = 0
86 self.errors = 0
87
88 def mark(self, what: str, when: Optional[PerfCounter] = None) -> None:
89 """
90 Mark that a reconfigure has occurred. The 'what' parameter is one of
91 "complete" for a complete reconfigure, "incremental" for an incremental,
92 or "diag" to indicate that we're not really reconfiguring, we just generated
93 the diagnostics so may need to log timers.
94
95 :param what: "complete", "incremental", or "diag".
96 :param when: The time at which this occurred. Can be None, meaning "now".
97 """
98
99 if not when:
100 when = time.perf_counter()
101
102 if (what == "incremental") and not self.last_complete:
103 # You can't have an incremental without a complete to start.
104 # If this is the first reconfigure, it's a complete reconfigure.
105 what = "complete"
106
107 # Should we update all the counters?
108 update_counters = True
109
110 if what == "complete":
111 # For a complete reconfigure, we need to clear all the outstanding
112 # incrementals, and also remember when it happened.
113 self.incrementals_outstanding = 0
114 self.last_complete = when
115
116 # A complete reconfigure also resets the last check time, because
117 # we consider the complete reconfigure to be a sanity check, basically.
118 # Note that it does _not_ reset any timer-logging stuff.
119 self.last_check = when
120
121 self.logger.debug(f"MARK COMPLETE @ {when}")
122 elif what == "incremental":
123 # For an incremental reconfigure, we need to remember that we have
124 # one more incremental outstanding.
125 self.incrementals_outstanding += 1
126
127 self.logger.debug(f"MARK INCREMENTAL @ {when}")
128 elif what == "diag":
129 # Don't update all the counters for a diagnostic update.
130 update_counters = False
131 else:
132 raise RuntimeError(f"ReconfigStats: unknown reconfigure type {what}")
133
134 # If we should update the counters...
135 if update_counters:
136 # ...then update the counts and our reconfigures list.
137 self.counts[what] += 1
138 self.reconfigures.append((what, when))
139
140 if len(self.reconfigures) > 10:
141 self.reconfigures.pop(0)
142
143 # In all cases, update the number of outstanding configurations. This will
144 # trigger timer logging for diagnostics updates.
145 self.configs_outstanding += 1
146
147 def needs_check(self, when: Optional[PerfCounter] = None) -> bool:
148 """
149 Determine if we need to do a complete reconfigure to doublecheck our
150 incrementals. The logic here is that we need a check every 100 incrementals
151 or every 10 minutes, whichever comes first.
152
153 :param when: Override the effective time of the check. Primarily useful for testing.
154 :return: True if a check is needed, False if not
155 """
156
157 if not when:
158 when = time.perf_counter()
159
160 if len(self.reconfigures) == 0:
161 # No reconfigures, so no need to check.
162 # self.logger.debug(f"NEEDS_CHECK @ {when}: no reconfigures, skip")
163 return False
164
165 # Grab information about our last reconfiguration.
166 what, _ = self.reconfigures[-1]
167
168 if what == "complete":
169 # Last reconfiguration was a complete reconfiguration, so
170 # no need to check.
171 # self.logger.debug(f"NEEDS_CHECK @ {when}: last was complete, skip")
172 return False
173
174 if self.incrementals_outstanding == 0:
175 # If we have a bunch of incrementals, then we do a check, we can land
176 # here with no outstanding incrementals, in which case it's pointless to
177 # do a check.
178 # self.logger.debug(f"NEEDS_CHECK @ {when}: outstanding 0, skip")
179 return False
180
181 # OK, the last one was an incremental, which implies that we must have some
182 # outstanding incrementals. Have we hit our maximum between checks?
183 if self.incrementals_outstanding >= self.max_incr_between_checks:
184 # Yup, time to check.
185 # self.logger.debug(f"NEEDS_CHECK @ {when}: outstanding {self.incrementals_outstanding}, check")
186 return True
187
188 # self.logger.debug(f"NEEDS_CHECK @ {when}: outstanding {self.incrementals_outstanding}")
189
190 # We're good for outstanding incrementals. How about the max time between checks?
191 # (We must have a last check time - which may be the time of the last complete
192 # reconfigure, of course - to go on at this point.)
193 assert self.last_check is not None
194
195 delta = when - self.last_check
196
197 if delta > self.max_time_between_checks:
198 # Yup, it's been long enough.
199 # self.logger.debug(f"NEEDS_CHECK @ {when}: delta {delta}, check")
200 return True
201
202 # self.logger.debug(f"NEEDS_CHECK @ {when}: delta {delta}, skip")
203 return False
204
205 def needs_timers(self, when: Optional[PerfCounter] = None) -> bool:
206 """
207 Determine if we need to log the timers or not. The logic here is that
208 we need to log every max_configs_between_timers incrementals or every
209 or every max_time_between_timers seconds, whichever comes first.
210
211 :param when: Override the effective time of the check. Primarily useful for testing.
212 :return: True if we need to log timers, False if not
213 """
214
215 if not when:
216 when = time.perf_counter()
217
218 if len(self.reconfigures) == 0:
219 # No reconfigures, so no need to check.
220 # self.logger.debug(f"NEEDS_TIMERS @ {when}: no reconfigures, skip")
221 return False
222
223 # If we have no configurations outstanding, we're done.
224 if self.configs_outstanding == 0:
225 # self.logger.debug(f"NEEDS_TIMERS @ {when}: outstanding 0, skip")
226 return False
227
228 # Have we hit our maximum number of outstanding configurations?
229 if self.configs_outstanding >= self.max_config_between_timers:
230 # Yup, time to log.
231 # self.logger.debug(f"NEEDS_TIMERS @ {when}: outstanding {self.configs_outstanding}, check")
232 return True
233
234 # self.logger.debug(f"NEEDS_TIMERS @ {when}: outstanding {self.configs_outstanding}")
235
236 # We're good for outstanding incrementals. How about the max time between timers?
237 # Note that we may _never_ have logged timers before -- if that's the case, use
238 # the time of our last complete reconfigure, which must always be set, as a
239 # baseline.
240
241 assert self.last_complete is not None
242
243 baseline = self.last_timer_log or self.last_complete
244 delta = when - baseline
245
246 if delta > self.max_time_between_timers:
247 # Yup, it's been long enough.
248 # self.logger.debug(f"NEEDS_TIMERS @ {when}: delta {delta}, check")
249 return True
250
251 # self.logger.debug(f"NEEDS_TIMERS @ {when}: delta {delta}, skip")
252 return False
253
254 def mark_checked(self, result: bool, when: Optional[PerfCounter] = None) -> None:
255 """
256 Mark that we have done a check, and note the results. This resets our
257 outstanding incrementals to 0, and also resets our last check time.
258
259 :param result: True if the check was good, False if not
260 :param when: Override the effective time. Primarily useful for testing.
261 """
262
263 self.logger.debug(f"MARK_CHECKED @ {when}: {result}")
264
265 self.incrementals_outstanding = 0
266 self.checks += 1
267
268 if not result:
269 self.errors += 1
270
271 self.last_check = when or time.perf_counter()
272
273 def mark_timers_logged(self, when: Optional[PerfCounter] = None) -> None:
274 """
275 Mark that we have logged timers. This resets our outstanding configurations
276 to 0, and also resets our last timer log time.
277
278 :param when: Override the effective time. Primarily useful for testing.
279 """
280
281 self.logger.debug(f"MARK_TIMERS @ {when}")
282
283 self.configs_outstanding = 0
284 self.last_timer_log = when or time.perf_counter()
285
286 @staticmethod
287 def isofmt(when: Optional[PerfCounter], now_pc: PerfCounter, now_dt: datetime.datetime) -> str:
288 if not when:
289 return "(none)"
290
291 delta = datetime.timedelta(seconds=when - now_pc)
292 when_dt = now_dt + delta
293
294 return when_dt.isoformat()
295
296 def dump(self) -> None:
297 now_pc = time.perf_counter()
298 now_dt = datetime.datetime.now()
299
300 for what, when in self.reconfigures:
301 self.logger.info(f"CACHE: {what} reconfigure at {self.isofmt(when, now_pc, now_dt)}")
302
303 for what in ["incremental", "complete"]:
304 self.logger.info(f"CACHE: {what} count: {self.counts[what]}")
305
306 self.logger.info(f"CACHE: incrementals outstanding: {self.incrementals_outstanding}")
307 self.logger.info(f"CACHE: incremental checks: {self.checks}, errors {self.errors}")
308 self.logger.info(f"CACHE: last_complete {self.isofmt(self.last_complete, now_pc, now_dt)}")
309 self.logger.info(f"CACHE: last_check {self.isofmt(self.last_check, now_pc, now_dt)}")
View as plain text