...

Text file src/github.com/emissary-ingress/emissary/v3/releng/lib/uiutil.py

Documentation: github.com/emissary-ingress/emissary/v3/releng/lib

     1#!/hint/python3
     2
     3import io
     4import os
     5import re
     6import shutil
     7import string
     8import subprocess
     9import sys
    10from contextlib import contextmanager
    11from traceback import print_exc
    12from typing import Callable, Generator, List, Optional, TextIO, Tuple
    13
    14from . import ansiterm
    15
    16# run()/run_bincapture()/run_txtcapture() and capture_output() seem like they're
    17# re-implementing something that should already exist for us to use.  And
    18# indeed, the `run*()` functions are essentially just stdlib `subprocess.run()`
    19# and `capture_output()` is essentially just stdlib
    20# `contextlib.redirect_stdout()`+`contextlib.redirect_stderr()`.  But the big
    21# reason for them to exist here is: `contextlib.redirect_*` and `subprocess`
    22# don't play together!  It's infuriating.
    23#
    24# So we define a global `_capturing` that is set while `capture_output()` is
    25# running, and have the `run*()` functions adjust their behavior if it's set.
    26# We could more generally do this by wrapping either `subprocess.run()` or
    27# `contextlib.redirect_*()`.  If we only ever called the redirect/capture
    28# function on a real file with a real file descriptor, it would be hairy, but
    29# not _too_ hairy[1].  But we want to call the redirect/capture function with
    30# not-a-real-file things like Indent or LineTracker.  So we'd have to get even
    31# hairier... we'd have to do a bunch of extra stuff when the output's
    32# `.fileno()` raises io.UnsupportedOperation; the same way that Go's
    33# `os/exec.Cmd` has to do extra stuff when the output ins't an `*os.File` (and
    34# that's one of the big reasons why I've said that Go's "os/exec" is superior to
    35# other languages subprocess facilities).  And it's my best judgment that just
    36# special-casing it with `_capturing` is the better choice than taking on all
    37# the complexity of mimicing Go's brilliance.
    38#
    39# [1]: https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
    40
    41_capturing = False
    42
    43
    44def check_command(args) -> bool:
    45    p = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    46    return p.returncode == 0
    47
    48
    49def run(args: List[str]) -> None:
    50    """run is like "subprocess.run(args)", but with helpful settings and
    51    obeys "with capture_output(out)".
    52    """
    53    if _capturing:
    54        try:
    55            subprocess.run(args, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    56        except subprocess.CalledProcessError as err:
    57            raise Exception(f"{err.stdout}{err}") from err
    58    else:
    59        subprocess.run(args, check=True)
    60
    61
    62def run_bincapture(args: List[str]) -> bytes:
    63    """run is like "subprocess.run(args, capture_out=True, text=False)",
    64    but with helpful settings and obeys "with capture_output(out)".
    65    """
    66    if _capturing:
    67        try:
    68            return subprocess.run(args, check=True, capture_output=True).stdout
    69        except subprocess.CalledProcessError as err:
    70            raise Exception(f"{err.stderr.decode('UTF-8')}{err}") from err
    71    else:
    72        return subprocess.run(args, check=True, stdout=subprocess.PIPE).stdout
    73
    74
    75def run_txtcapture(args: List[str]) -> str:
    76    """run is like "subprocess.run(args, capture_out=True, text=true)",
    77    but with helpful settings and obeys "with capture_output(out)".
    78    """
    79    if _capturing:
    80        try:
    81            out = subprocess.run(args, check=True, capture_output=True, text=True).stdout
    82        except subprocess.CalledProcessError as err:
    83            raise Exception(f"{err.stderr}{err}") from err
    84    else:
    85        out = subprocess.run(args, check=True, stdout=subprocess.PIPE, text=True).stdout
    86    if out.endswith("\n"):
    87        out = out[:-1]
    88    return out
    89
    90
    91@contextmanager
    92def capture_output(log: io.StringIO) -> Generator[None, None, None]:
    93    """capture_output is like contextlib.redirect_stdout but also
    94    redirects stderr, and also does some extra stuff so that we can
    95    have run/run_bincapture/run_txtcapture functions that obey it.
    96    """
    97    global _capturing
    98
    99    saved_capturing = _capturing
   100    saved_stdout = sys.stdout
   101    saved_stderr = sys.stderr
   102
   103    _capturing = True
   104    sys.stdout = sys.stderr = log
   105    try:
   106        yield
   107    finally:
   108        _capturing = saved_capturing
   109        sys.stdout = saved_stdout
   110        sys.stderr = saved_stderr
   111
   112
   113def _lex_char_or_cs(text: str) -> Tuple[str, str]:
   114    """Look atthe beginning of the given text and trim either a byte, or
   115    an ANSI control sequence from the beginning, returning a tuple
   116    ("char-or-cs", "remaining-text").  If it looks like the text is a
   117    truncated control seqence, then it doesn't trim anything, and
   118    returns ("", "original"); signaling that it needs to wait for more
   119    input before successfully lexing anything.
   120    """
   121    if text == '\033':
   122        # wait to see if this is a control sequence
   123        return '', text
   124    i = 1
   125    if text.startswith('\033['):
   126        try:
   127            i = len('\033[')
   128            while text[i] not in string.ascii_letters:
   129                i += 1
   130            i += 1
   131        except IndexError:
   132            # wait for a complete control sequence
   133            return '', text
   134    return text[:i], text[i:]
   135
   136
   137class Indent(io.StringIO):
   138    """Indent() is like a io.StringIO(), will indent text with the given
   139    string.
   140    """
   141    def __init__(self, indent: str = "", output: Optional[TextIO] = None, columns: Optional[int] = None) -> None:
   142        """Arguments:
   143          indent: str: The string to indent with.
   144          output: Optional[TextIO]: A TextIO to write to, instead of
   145                  building an in-memory buffer.
   146          columns: Optional[int]: How wide the terminal is; this is
   147                   imporant because a line wrap needs to trigger an
   148                   indent.  If not given, then 'output.columns' is
   149                   used if 'output' is set and has a 'columns'
   150                   attribute, otherwise shutil.get_terminal_size() is
   151                   used.  Use a value <= 0 to explicitly disable
   152                   wrapping.
   153        The 'columns' attribute on the resulting object is set to the
   154        number of usable colums; "arg_columns - len(indent)".  This
   155        allows Indent objects to be nested.
   156        Indent understands "\r" and "\n", but not "\t" or ANSI control
   157        sequences that move the cursor; it assumes that all ANSI
   158        control sequences do not move the cursor.
   159        """
   160        super().__init__()
   161        self._indent = indent
   162        self._output = output
   163
   164        if columns is None:
   165            if output and hasattr(output, 'columns'):
   166                columns = output.columns  # type: ignore
   167            else:
   168                columns = shutil.get_terminal_size().columns
   169        self.columns = columns - len(self._indent)
   170
   171    _rest = ""
   172    _cur_col = 0
   173    # 0: no indent has been printed for this line, and indent will need to be printed unless this is the final trailing NL
   174    # 1: an indent needs to be printed for this line IFF there is any more output on it
   175    # 2: no indent (currently) needs to be printed for this line
   176    _print_indent = 0
   177
   178    def write(self, text: str) -> int:
   179        # This algorithm is based on
   180        # https://git.parabola.nu/packages/libretools.git/tree/src/chroot-tools/indent
   181        self._rest += text
   182
   183        while self._rest:
   184            c, self._rest = _lex_char_or_cs(self._rest)
   185            if c == "":
   186                # wait for more input
   187                break
   188            elif c == "\n":
   189                if self._print_indent < 1:
   190                    self._write(self._indent)
   191                self._write(c)
   192                self._print_indent = 0
   193                self._cur_col = 0
   194            elif c == "\r":
   195                self._write(c)
   196                self._print_indent = min(self._print_indent, 1)
   197                self._cur_col = 0
   198            elif c.startswith('\033['):
   199                if self._print_indent < 2:
   200                    self._write(self._indent)
   201                self._write(c)
   202                self._print_indent = 2
   203            elif self.columns > 0 and self._cur_col >= self.columns:
   204                self._rest = "\n" + c + self._rest
   205            else:
   206                if self._print_indent < 2:
   207                    self._write(self._indent)
   208                self._write(c)
   209                self._print_indent = 2
   210                self._cur_col += len(c)
   211        return len(text)
   212
   213    def _write(self, text: str) -> None:
   214        if self._output:
   215            self._output.write(text)
   216        else:
   217            super().write(text)
   218
   219    def flush(self) -> None:
   220        if self._output:
   221            self._output.flush()
   222        else:
   223            super().flush()
   224
   225    def input(self) -> str:
   226        """Use "myindent.input()" instead of "input()" in order to nest well
   227        with LineTrackers.
   228        """
   229        if hasattr(self._output, 'input'):
   230            text: str = self._output.input()  # type: ignore
   231        else:
   232            text = input()
   233        return text
   234
   235
   236class LineTracker(io.StringIO):
   237    """LineTracker() is like a io.StringIO(), but will keep track of which
   238    line you're on; starting on line "1".
   239    LineTracker understands "\n", and the "cursor-up" (CSI-A) control
   240    sequence.  It does not detect wrapped lines; use Indent() to turn
   241    those in to hard-wraps that LineTracker understands.
   242    """
   243    def __init__(self, output: Optional[TextIO] = None) -> None:
   244        self._output = output
   245        if output and hasattr(output, 'columns'):
   246            self.columns = output.columns  # type: ignore
   247
   248    cur_line = 1
   249
   250    _rest = ""
   251
   252    def _handle(self, text: str) -> None:
   253        self._rest += text
   254        while self._rest:
   255            c, self._rest = _lex_char_or_cs(self._rest)
   256            if c == "":
   257                # wait for more input
   258                break
   259            elif c == "\n":
   260                self.cur_line += 1
   261            elif c.startswith("\033[") and c.endswith('A'):
   262                lines = int(c[len("\033["):-len('A')] or "1")
   263                self.cur_line -= lines
   264
   265    def input(self) -> str:
   266        """Use "mylinetracker.input()" instead of "input()" to avoid the
   267        LineTracker not seeing any newlines input by the user.
   268        """
   269        if hasattr(self._output, 'input'):
   270            text: str = self._output.input()  # type: ignore
   271        else:
   272            text = input()
   273        self._handle(text + "\n")
   274        return text
   275
   276    def goto_line(self, line: int) -> None:
   277        """goto_line moves the cursor to the beginning of the given line;
   278        where line 1 is the line that the LineTracker started on, line
   279        0 is the line above that, and line 1 is the line below
   280        that.
   281        """
   282        self.write("\r")
   283        if line < self.cur_line:
   284            total_lines = shutil.get_terminal_size().lines
   285            if (self.cur_line - line) >= total_lines:
   286                raise Exception(f"cannot go back {self.cur_line - line} lines (limit={total_lines - 1})")
   287            self.write(ansiterm.cursor_up(self.cur_line - line))
   288        else:
   289            self.write("\n" * (line - self.cur_line))
   290
   291    def write(self, text: str) -> int:
   292        self._handle(text)
   293        if self._output:
   294            return self._output.write(text)
   295        else:
   296            return super().write(text)
   297
   298    def flush(self) -> None:
   299        if self._output:
   300            self._output.flush()
   301        else:
   302            super().flush()
   303
   304
   305class Checker:
   306    """Checker is a terminal UI widget for printing a series of '[....]'
   307    (running) / '[ OK ]' / '[FAIL]' checks where we can diagnostic
   308    output while the check is running, and then go back and update the
   309    status, and nest checks.
   310    """
   311
   312    ok: bool = True
   313
   314    @contextmanager
   315    def check(self, name: str, clear_on_success: bool = True) -> Generator['CheckResult', None, None]:
   316        """check returns a context manager that handles printing a '[....]'  /
   317        '[ OK ]' / '[FAIL]' check.  While the check is running, it
   318        will stream whatever you write to stdout/stderr.  If
   319        clear_on_success is True, then once the check finishes, if the
   320        check passed then it will erase that stdout/stderr output,
   321        since you probably only want diagnostic output if the check
   322        fails.
   323        You can provide a (1-line) textual check result that will be
   324        shown on both success and failure by writing to "mycheck.result".
   325        You may cause a check to fail by either raising an Exception,
   326        or by setting "mycheck.ok = False".  If you do neither of these,
   327        then the check will be considered to pass.
   328        The mycheck.subcheck method returns a context manager for a
   329        nested child check.
   330        """
   331        def line(status: str, rest: Optional[str] = None) -> str:
   332            txt = name
   333            if rest:
   334                txt = f'{txt}: {rest}'
   335            return f" {status}{ansiterm.sgr} {txt}"
   336
   337        output = LineTracker(output=sys.stdout)
   338        output.write(line(status=f'{ansiterm.sgr.bold.fg_blu}[....]') + "\n")
   339
   340        check = CheckResult()
   341
   342        with capture_output(Indent(output=output, indent="   > ")):
   343            try:
   344                yield check
   345            except Exception as err:
   346                if str(err).strip():
   347                    print(err)
   348                check.ok = False
   349
   350        end = output.cur_line
   351        output.goto_line(1)
   352        if check.ok:
   353            output.write(line(status=f'{ansiterm.sgr.bold.fg_grn}[ OK ]', rest=check.result))
   354        else:
   355            output.write(line(status=f'{ansiterm.sgr.bold.fg_red}[FAIL]', rest=check.result))
   356        if check.ok and clear_on_success:
   357            output.write(ansiterm.clear_rest_of_screen + "\n")
   358        else:
   359            output.write(ansiterm.clear_rest_of_line)
   360            output.goto_line(end)
   361
   362        self.ok &= check.ok
   363
   364    # alias for readability
   365    subcheck = check
   366
   367
   368class CheckResult(Checker):
   369    """A CheckResult is the context manager type returned by
   370    "Checker.check".
   371    """
   372    result: Optional[str] = None

View as plain text