...

Text file src/github.com/alecthomas/chroma/lexers/testdata/python/test_complex_file2.actual

Documentation: github.com/alecthomas/chroma/lexers/testdata/python

     1# From CPython (Lib/asyncio/subprocess.py)
     2__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
     3
     4import subprocess
     5
     6from . import events
     7from . import protocols
     8from . import streams
     9from . import tasks
    10from .log import logger
    11
    12
    13PIPE = subprocess.PIPE
    14STDOUT = subprocess.STDOUT
    15DEVNULL = subprocess.DEVNULL
    16
    17
    18class SubprocessStreamProtocol(streams.FlowControlMixin,
    19                               protocols.SubprocessProtocol):
    20    """Like StreamReaderProtocol, but for a subprocess."""
    21
    22    def __init__(self, limit, loop):
    23        super().__init__(loop=loop)
    24        self._limit = limit
    25        self.stdin = self.stdout = self.stderr = None
    26        self._transport = None
    27        self._process_exited = False
    28        self._pipe_fds = []
    29        self._stdin_closed = self._loop.create_future()
    30
    31    def __repr__(self):
    32        info = [self.__class__.__name__]
    33        if self.stdin is not None:
    34            info.append(f'stdin={self.stdin!r}')
    35        if self.stdout is not None:
    36            info.append(f'stdout={self.stdout!r}')
    37        if self.stderr is not None:
    38            info.append(f'stderr={self.stderr!r}')
    39        return '<{}>'.format(' '.join(info))
    40
    41    def connection_made(self, transport):
    42        self._transport = transport
    43
    44        stdout_transport = transport.get_pipe_transport(1)
    45        if stdout_transport is not None:
    46            self.stdout = streams.StreamReader(limit=self._limit,
    47                                               loop=self._loop)
    48            self.stdout.set_transport(stdout_transport)
    49            self._pipe_fds.append(1)
    50
    51        stderr_transport = transport.get_pipe_transport(2)
    52        if stderr_transport is not None:
    53            self.stderr = streams.StreamReader(limit=self._limit,
    54                                               loop=self._loop)
    55            self.stderr.set_transport(stderr_transport)
    56            self._pipe_fds.append(2)
    57
    58        stdin_transport = transport.get_pipe_transport(0)
    59        if stdin_transport is not None:
    60            self.stdin = streams.StreamWriter(stdin_transport,
    61                                              protocol=self,
    62                                              reader=None,
    63                                              loop=self._loop)
    64
    65    def pipe_data_received(self, fd, data):
    66        if fd == 1:
    67            reader = self.stdout
    68        elif fd == 2:
    69            reader = self.stderr
    70        else:
    71            reader = None
    72        if reader is not None:
    73            reader.feed_data(data)
    74
    75    def pipe_connection_lost(self, fd, exc):
    76        if fd == 0:
    77            pipe = self.stdin
    78            if pipe is not None:
    79                pipe.close()
    80            self.connection_lost(exc)
    81            if exc is None:
    82                self._stdin_closed.set_result(None)
    83            else:
    84                self._stdin_closed.set_exception(exc)
    85            return
    86        if fd == 1:
    87            reader = self.stdout
    88        elif fd == 2:
    89            reader = self.stderr
    90        else:
    91            reader = None
    92        if reader is not None:
    93            if exc is None:
    94                reader.feed_eof()
    95            else:
    96                reader.set_exception(exc)
    97
    98        if fd in self._pipe_fds:
    99            self._pipe_fds.remove(fd)
   100        self._maybe_close_transport()
   101
   102    def process_exited(self):
   103        self._process_exited = True
   104        self._maybe_close_transport()
   105
   106    def _maybe_close_transport(self):
   107        if len(self._pipe_fds) == 0 and self._process_exited:
   108            self._transport.close()
   109            self._transport = None
   110
   111    def _get_close_waiter(self, stream):
   112        if stream is self.stdin:
   113            return self._stdin_closed
   114
   115
   116class Process:
   117    def __init__(self, transport, protocol, loop):
   118        self._transport = transport
   119        self._protocol = protocol
   120        self._loop = loop
   121        self.stdin = protocol.stdin
   122        self.stdout = protocol.stdout
   123        self.stderr = protocol.stderr
   124        self.pid = transport.get_pid()
   125
   126    def __repr__(self):
   127        return f'<{self.__class__.__name__} {self.pid}>'
   128
   129    @property
   130    def returncode(self):
   131        return self._transport.get_returncode()
   132
   133    async def wait(self):
   134        """Wait until the process exit and return the process return code."""
   135        return await self._transport._wait()
   136
   137    def send_signal(self, signal):
   138        self._transport.send_signal(signal)
   139
   140    def terminate(self):
   141        self._transport.terminate()
   142
   143    def kill(self):
   144        self._transport.kill()
   145
   146    async def _feed_stdin(self, input):
   147        debug = self._loop.get_debug()
   148        self.stdin.write(input)
   149        if debug:
   150            logger.debug(
   151                '%r communicate: feed stdin (%s bytes)', self, len(input))
   152        try:
   153            await self.stdin.drain()
   154        except (BrokenPipeError, ConnectionResetError) as exc:
   155            # communicate() ignores BrokenPipeError and ConnectionResetError
   156            if debug:
   157                logger.debug('%r communicate: stdin got %r', self, exc)
   158
   159        if debug:
   160            logger.debug('%r communicate: close stdin', self)
   161        self.stdin.close()
   162
   163    async def _noop(self):
   164        return None
   165
   166    async def _read_stream(self, fd):
   167        transport = self._transport.get_pipe_transport(fd)
   168        if fd == 2:
   169            stream = self.stderr
   170        else:
   171            assert fd == 1
   172            stream = self.stdout
   173        if self._loop.get_debug():
   174            name = 'stdout' if fd == 1 else 'stderr'
   175            logger.debug('%r communicate: read %s', self, name)
   176        output = await stream.read()
   177        if self._loop.get_debug():
   178            name = 'stdout' if fd == 1 else 'stderr'
   179            logger.debug('%r communicate: close %s', self, name)
   180        transport.close()
   181        return output
   182
   183    async def communicate(self, input=None):
   184        if input is not None:
   185            stdin = self._feed_stdin(input)
   186        else:
   187            stdin = self._noop()
   188        if self.stdout is not None:
   189            stdout = self._read_stream(1)
   190        else:
   191            stdout = self._noop()
   192        if self.stderr is not None:
   193            stderr = self._read_stream(2)
   194        else:
   195            stderr = self._noop()
   196        stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
   197        await self.wait()
   198        return (stdout, stderr)
   199
   200
   201async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
   202                                  limit=streams._DEFAULT_LIMIT, **kwds):
   203    loop = events.get_running_loop()
   204    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
   205                                                        loop=loop)
   206    transport, protocol = await loop.subprocess_shell(
   207        protocol_factory,
   208        cmd, stdin=stdin, stdout=stdout,
   209        stderr=stderr, **kwds)
   210    return Process(transport, protocol, loop)
   211
   212
   213async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
   214                                 stderr=None, limit=streams._DEFAULT_LIMIT,
   215                                 **kwds):
   216    loop = events.get_running_loop()
   217    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
   218                                                        loop=loop)
   219    transport, protocol = await loop.subprocess_exec(
   220        protocol_factory,
   221        program, *args,
   222        stdin=stdin, stdout=stdout,
   223        stderr=stderr, **kwds)
   224    return Process(transport, protocol, loop)

View as plain text