1#!/hint/python3
2
3import io
4import os
5import re
6import shutil
7import string
8import subprocess
9import sys
10from contextlib import contextmanager
11from traceback import print_exc
12from typing import Callable, Generator, List, Optional, TextIO, Tuple
13
14from . import ansiterm
15
16# run()/run_bincapture()/run_txtcapture() and capture_output() seem like they're
17# re-implementing something that should already exist for us to use. And
18# indeed, the `run*()` functions are essentially just stdlib `subprocess.run()`
19# and `capture_output()` is essentially just stdlib
20# `contextlib.redirect_stdout()`+`contextlib.redirect_stderr()`. But the big
21# reason for them to exist here is: `contextlib.redirect_*` and `subprocess`
22# don't play together! It's infuriating.
23#
24# So we define a global `_capturing` that is set while `capture_output()` is
25# running, and have the `run*()` functions adjust their behavior if it's set.
26# We could more generally do this by wrapping either `subprocess.run()` or
27# `contextlib.redirect_*()`. If we only ever called the redirect/capture
28# function on a real file with a real file descriptor, it would be hairy, but
29# not _too_ hairy[1]. But we want to call the redirect/capture function with
30# not-a-real-file things like Indent or LineTracker. So we'd have to get even
31# hairier... we'd have to do a bunch of extra stuff when the output's
32# `.fileno()` raises io.UnsupportedOperation; the same way that Go's
33# `os/exec.Cmd` has to do extra stuff when the output ins't an `*os.File` (and
34# that's one of the big reasons why I've said that Go's "os/exec" is superior to
35# other languages subprocess facilities). And it's my best judgment that just
36# special-casing it with `_capturing` is the better choice than taking on all
37# the complexity of mimicing Go's brilliance.
38#
39# [1]: https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
40
41_capturing = False
42
43
44def check_command(args) -> bool:
45 p = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
46 return p.returncode == 0
47
48
49def run(args: List[str]) -> None:
50 """run is like "subprocess.run(args)", but with helpful settings and
51 obeys "with capture_output(out)".
52 """
53 if _capturing:
54 try:
55 subprocess.run(args, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
56 except subprocess.CalledProcessError as err:
57 raise Exception(f"{err.stdout}{err}") from err
58 else:
59 subprocess.run(args, check=True)
60
61
62def run_bincapture(args: List[str]) -> bytes:
63 """run is like "subprocess.run(args, capture_out=True, text=False)",
64 but with helpful settings and obeys "with capture_output(out)".
65 """
66 if _capturing:
67 try:
68 return subprocess.run(args, check=True, capture_output=True).stdout
69 except subprocess.CalledProcessError as err:
70 raise Exception(f"{err.stderr.decode('UTF-8')}{err}") from err
71 else:
72 return subprocess.run(args, check=True, stdout=subprocess.PIPE).stdout
73
74
75def run_txtcapture(args: List[str]) -> str:
76 """run is like "subprocess.run(args, capture_out=True, text=true)",
77 but with helpful settings and obeys "with capture_output(out)".
78 """
79 if _capturing:
80 try:
81 out = subprocess.run(args, check=True, capture_output=True, text=True).stdout
82 except subprocess.CalledProcessError as err:
83 raise Exception(f"{err.stderr}{err}") from err
84 else:
85 out = subprocess.run(args, check=True, stdout=subprocess.PIPE, text=True).stdout
86 if out.endswith("\n"):
87 out = out[:-1]
88 return out
89
90
91@contextmanager
92def capture_output(log: io.StringIO) -> Generator[None, None, None]:
93 """capture_output is like contextlib.redirect_stdout but also
94 redirects stderr, and also does some extra stuff so that we can
95 have run/run_bincapture/run_txtcapture functions that obey it.
96 """
97 global _capturing
98
99 saved_capturing = _capturing
100 saved_stdout = sys.stdout
101 saved_stderr = sys.stderr
102
103 _capturing = True
104 sys.stdout = sys.stderr = log
105 try:
106 yield
107 finally:
108 _capturing = saved_capturing
109 sys.stdout = saved_stdout
110 sys.stderr = saved_stderr
111
112
113def _lex_char_or_cs(text: str) -> Tuple[str, str]:
114 """Look atthe beginning of the given text and trim either a byte, or
115 an ANSI control sequence from the beginning, returning a tuple
116 ("char-or-cs", "remaining-text"). If it looks like the text is a
117 truncated control seqence, then it doesn't trim anything, and
118 returns ("", "original"); signaling that it needs to wait for more
119 input before successfully lexing anything.
120 """
121 if text == '\033':
122 # wait to see if this is a control sequence
123 return '', text
124 i = 1
125 if text.startswith('\033['):
126 try:
127 i = len('\033[')
128 while text[i] not in string.ascii_letters:
129 i += 1
130 i += 1
131 except IndexError:
132 # wait for a complete control sequence
133 return '', text
134 return text[:i], text[i:]
135
136
137class Indent(io.StringIO):
138 """Indent() is like a io.StringIO(), will indent text with the given
139 string.
140 """
141 def __init__(self, indent: str = "", output: Optional[TextIO] = None, columns: Optional[int] = None) -> None:
142 """Arguments:
143 indent: str: The string to indent with.
144 output: Optional[TextIO]: A TextIO to write to, instead of
145 building an in-memory buffer.
146 columns: Optional[int]: How wide the terminal is; this is
147 imporant because a line wrap needs to trigger an
148 indent. If not given, then 'output.columns' is
149 used if 'output' is set and has a 'columns'
150 attribute, otherwise shutil.get_terminal_size() is
151 used. Use a value <= 0 to explicitly disable
152 wrapping.
153 The 'columns' attribute on the resulting object is set to the
154 number of usable colums; "arg_columns - len(indent)". This
155 allows Indent objects to be nested.
156 Indent understands "\r" and "\n", but not "\t" or ANSI control
157 sequences that move the cursor; it assumes that all ANSI
158 control sequences do not move the cursor.
159 """
160 super().__init__()
161 self._indent = indent
162 self._output = output
163
164 if columns is None:
165 if output and hasattr(output, 'columns'):
166 columns = output.columns # type: ignore
167 else:
168 columns = shutil.get_terminal_size().columns
169 self.columns = columns - len(self._indent)
170
171 _rest = ""
172 _cur_col = 0
173 # 0: no indent has been printed for this line, and indent will need to be printed unless this is the final trailing NL
174 # 1: an indent needs to be printed for this line IFF there is any more output on it
175 # 2: no indent (currently) needs to be printed for this line
176 _print_indent = 0
177
178 def write(self, text: str) -> int:
179 # This algorithm is based on
180 # https://git.parabola.nu/packages/libretools.git/tree/src/chroot-tools/indent
181 self._rest += text
182
183 while self._rest:
184 c, self._rest = _lex_char_or_cs(self._rest)
185 if c == "":
186 # wait for more input
187 break
188 elif c == "\n":
189 if self._print_indent < 1:
190 self._write(self._indent)
191 self._write(c)
192 self._print_indent = 0
193 self._cur_col = 0
194 elif c == "\r":
195 self._write(c)
196 self._print_indent = min(self._print_indent, 1)
197 self._cur_col = 0
198 elif c.startswith('\033['):
199 if self._print_indent < 2:
200 self._write(self._indent)
201 self._write(c)
202 self._print_indent = 2
203 elif self.columns > 0 and self._cur_col >= self.columns:
204 self._rest = "\n" + c + self._rest
205 else:
206 if self._print_indent < 2:
207 self._write(self._indent)
208 self._write(c)
209 self._print_indent = 2
210 self._cur_col += len(c)
211 return len(text)
212
213 def _write(self, text: str) -> None:
214 if self._output:
215 self._output.write(text)
216 else:
217 super().write(text)
218
219 def flush(self) -> None:
220 if self._output:
221 self._output.flush()
222 else:
223 super().flush()
224
225 def input(self) -> str:
226 """Use "myindent.input()" instead of "input()" in order to nest well
227 with LineTrackers.
228 """
229 if hasattr(self._output, 'input'):
230 text: str = self._output.input() # type: ignore
231 else:
232 text = input()
233 return text
234
235
236class LineTracker(io.StringIO):
237 """LineTracker() is like a io.StringIO(), but will keep track of which
238 line you're on; starting on line "1".
239 LineTracker understands "\n", and the "cursor-up" (CSI-A) control
240 sequence. It does not detect wrapped lines; use Indent() to turn
241 those in to hard-wraps that LineTracker understands.
242 """
243 def __init__(self, output: Optional[TextIO] = None) -> None:
244 self._output = output
245 if output and hasattr(output, 'columns'):
246 self.columns = output.columns # type: ignore
247
248 cur_line = 1
249
250 _rest = ""
251
252 def _handle(self, text: str) -> None:
253 self._rest += text
254 while self._rest:
255 c, self._rest = _lex_char_or_cs(self._rest)
256 if c == "":
257 # wait for more input
258 break
259 elif c == "\n":
260 self.cur_line += 1
261 elif c.startswith("\033[") and c.endswith('A'):
262 lines = int(c[len("\033["):-len('A')] or "1")
263 self.cur_line -= lines
264
265 def input(self) -> str:
266 """Use "mylinetracker.input()" instead of "input()" to avoid the
267 LineTracker not seeing any newlines input by the user.
268 """
269 if hasattr(self._output, 'input'):
270 text: str = self._output.input() # type: ignore
271 else:
272 text = input()
273 self._handle(text + "\n")
274 return text
275
276 def goto_line(self, line: int) -> None:
277 """goto_line moves the cursor to the beginning of the given line;
278 where line 1 is the line that the LineTracker started on, line
279 0 is the line above that, and line 1 is the line below
280 that.
281 """
282 self.write("\r")
283 if line < self.cur_line:
284 total_lines = shutil.get_terminal_size().lines
285 if (self.cur_line - line) >= total_lines:
286 raise Exception(f"cannot go back {self.cur_line - line} lines (limit={total_lines - 1})")
287 self.write(ansiterm.cursor_up(self.cur_line - line))
288 else:
289 self.write("\n" * (line - self.cur_line))
290
291 def write(self, text: str) -> int:
292 self._handle(text)
293 if self._output:
294 return self._output.write(text)
295 else:
296 return super().write(text)
297
298 def flush(self) -> None:
299 if self._output:
300 self._output.flush()
301 else:
302 super().flush()
303
304
305class Checker:
306 """Checker is a terminal UI widget for printing a series of '[....]'
307 (running) / '[ OK ]' / '[FAIL]' checks where we can diagnostic
308 output while the check is running, and then go back and update the
309 status, and nest checks.
310 """
311
312 ok: bool = True
313
314 @contextmanager
315 def check(self, name: str, clear_on_success: bool = True) -> Generator['CheckResult', None, None]:
316 """check returns a context manager that handles printing a '[....]' /
317 '[ OK ]' / '[FAIL]' check. While the check is running, it
318 will stream whatever you write to stdout/stderr. If
319 clear_on_success is True, then once the check finishes, if the
320 check passed then it will erase that stdout/stderr output,
321 since you probably only want diagnostic output if the check
322 fails.
323 You can provide a (1-line) textual check result that will be
324 shown on both success and failure by writing to "mycheck.result".
325 You may cause a check to fail by either raising an Exception,
326 or by setting "mycheck.ok = False". If you do neither of these,
327 then the check will be considered to pass.
328 The mycheck.subcheck method returns a context manager for a
329 nested child check.
330 """
331 def line(status: str, rest: Optional[str] = None) -> str:
332 txt = name
333 if rest:
334 txt = f'{txt}: {rest}'
335 return f" {status}{ansiterm.sgr} {txt}"
336
337 output = LineTracker(output=sys.stdout)
338 output.write(line(status=f'{ansiterm.sgr.bold.fg_blu}[....]') + "\n")
339
340 check = CheckResult()
341
342 with capture_output(Indent(output=output, indent=" > ")):
343 try:
344 yield check
345 except Exception as err:
346 if str(err).strip():
347 print(err)
348 check.ok = False
349
350 end = output.cur_line
351 output.goto_line(1)
352 if check.ok:
353 output.write(line(status=f'{ansiterm.sgr.bold.fg_grn}[ OK ]', rest=check.result))
354 else:
355 output.write(line(status=f'{ansiterm.sgr.bold.fg_red}[FAIL]', rest=check.result))
356 if check.ok and clear_on_success:
357 output.write(ansiterm.clear_rest_of_screen + "\n")
358 else:
359 output.write(ansiterm.clear_rest_of_line)
360 output.goto_line(end)
361
362 self.ok &= check.ok
363
364 # alias for readability
365 subcheck = check
366
367
368class CheckResult(Checker):
369 """A CheckResult is the context manager type returned by
370 "Checker.check".
371 """
372 result: Optional[str] = None
View as plain text