...

Text file src/github.com/alecthomas/chroma/v2/lexers/testdata/python/test_complex_file1.actual

Documentation: github.com/alecthomas/chroma/v2/lexers/testdata/python

     1# From CPython (Lib/asyncio/coroutines.py)
     2__all__ = 'coroutine', 'iscoroutinefunction', 'iscoroutine'
     3
     4import collections.abc
     5import functools
     6import inspect
     7import os
     8import sys
     9import traceback
    10import types
    11import warnings
    12
    13from . import base_futures
    14from . import constants
    15from . import format_helpers
    16from .log import logger
    17
    18
    19def _is_debug_mode():
    20    # If you set _DEBUG to true, @coroutine will wrap the resulting
    21    # generator objects in a CoroWrapper instance (defined below).  That
    22    # instance will log a message when the generator is never iterated
    23    # over, which may happen when you forget to use "await" or "yield from"
    24    # with a coroutine call.
    25    # Note that the value of the _DEBUG flag is taken
    26    # when the decorator is used, so to be of any use it must be set
    27    # before you define your coroutines.  A downside of using this feature
    28    # is that tracebacks show entries for the CoroWrapper.__next__ method
    29    # when _DEBUG is true.
    30    return sys.flags.dev_mode or (not sys.flags.ignore_environment and
    31                                  bool(os.environ.get('PYTHONASYNCIODEBUG')))
    32
    33
    34_DEBUG = _is_debug_mode()
    35
    36
    37class CoroWrapper:
    38    # Wrapper for coroutine object in _DEBUG mode.
    39
    40    def __init__(self, gen, func=None):
    41        assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen
    42        self.gen = gen
    43        self.func = func  # Used to unwrap @coroutine decorator
    44        self._source_traceback = format_helpers.extract_stack(sys._getframe(1))
    45        self.__name__ = getattr(gen, '__name__', None)
    46        self.__qualname__ = getattr(gen, '__qualname__', None)
    47
    48    def __repr__(self):
    49        coro_repr = _format_coroutine(self)
    50        if self._source_traceback:
    51            frame = self._source_traceback[-1]
    52            coro_repr += f', created at {frame[0]}:{frame[1]}'
    53
    54        return f'<{self.__class__.__name__} {coro_repr}>'
    55
    56    def __iter__(self):
    57        return self
    58
    59    def __next__(self):
    60        return self.gen.send(None)
    61
    62    def send(self, value):
    63        return self.gen.send(value)
    64
    65    def throw(self, type, value=None, traceback=None):
    66        return self.gen.throw(type, value, traceback)
    67
    68    def close(self):
    69        return self.gen.close()
    70
    71    @property
    72    def gi_frame(self):
    73        return self.gen.gi_frame
    74
    75    @property
    76    def gi_running(self):
    77        return self.gen.gi_running
    78
    79    @property
    80    def gi_code(self):
    81        return self.gen.gi_code
    82
    83    def __await__(self):
    84        return self
    85
    86    @property
    87    def gi_yieldfrom(self):
    88        return self.gen.gi_yieldfrom
    89
    90    def __del__(self):
    91        # Be careful accessing self.gen.frame -- self.gen might not exist.
    92        gen = getattr(self, 'gen', None)
    93        frame = getattr(gen, 'gi_frame', None)
    94        if frame is not None and frame.f_lasti == -1:
    95            msg = f'{self!r} was never yielded from'
    96            tb = getattr(self, '_source_traceback', ())
    97            if tb:
    98                tb = ''.join(traceback.format_list(tb))
    99                msg += (f'\nCoroutine object created at '
   100                        f'(most recent call last, truncated to '
   101                        f'{constants.DEBUG_STACK_DEPTH} last lines):\n')
   102                msg += tb.rstrip()
   103            logger.error(msg)
   104
   105
   106def coroutine(func):
   107    """Decorator to mark coroutines.
   108
   109    If the coroutine is not yielded from before it is destroyed,
   110    an error message is logged.
   111    """
   112    warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead',
   113                  DeprecationWarning,
   114                  stacklevel=2)
   115    if inspect.iscoroutinefunction(func):
   116        # In Python 3.5 that's all we need to do for coroutines
   117        # defined with "async def".
   118        return func
   119
   120    if inspect.isgeneratorfunction(func):
   121        coro = func
   122    else:
   123        @functools.wraps(func)
   124        def coro(*args, **kw):
   125            res = func(*args, **kw)
   126            if (base_futures.isfuture(res) or inspect.isgenerator(res) or
   127                    isinstance(res, CoroWrapper)):
   128                res = yield from res
   129            else:
   130                # If 'res' is an awaitable, run it.
   131                try:
   132                    await_meth = res.__await__
   133                except AttributeError:
   134                    pass
   135                else:
   136                    if isinstance(res, collections.abc.Awaitable):
   137                        res = yield from await_meth()
   138            return res
   139
   140    coro = types.coroutine(coro)
   141    if not _DEBUG:
   142        wrapper = coro
   143    else:
   144        @functools.wraps(func)
   145        def wrapper(*args, **kwds):
   146            w = CoroWrapper(coro(*args, **kwds), func=func)
   147            if w._source_traceback:
   148                del w._source_traceback[-1]
   149            # Python < 3.5 does not implement __qualname__
   150            # on generator objects, so we set it manually.
   151            # We use getattr as some callables (such as
   152            # functools.partial may lack __qualname__).
   153            w.__name__ = getattr(func, '__name__', None)
   154            w.__qualname__ = getattr(func, '__qualname__', None)
   155            return w
   156
   157    wrapper._is_coroutine = _is_coroutine  # For iscoroutinefunction().
   158    return wrapper
   159
   160
   161# A marker for iscoroutinefunction.
   162_is_coroutine = object()
   163
   164
   165def iscoroutinefunction(func):
   166    """Return True if func is a decorated coroutine function."""
   167    return (inspect.iscoroutinefunction(func) or
   168            getattr(func, '_is_coroutine', None) is _is_coroutine)
   169
   170
   171# Prioritize native coroutine check to speed-up
   172# asyncio.iscoroutine.
   173_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType,
   174                    collections.abc.Coroutine, CoroWrapper)
   175_iscoroutine_typecache = set()
   176
   177
   178def iscoroutine(obj):
   179    """Return True if obj is a coroutine object."""
   180    if type(obj) in _iscoroutine_typecache:
   181        return True
   182
   183    if isinstance(obj, _COROUTINE_TYPES):
   184        # Just in case we don't want to cache more than 100
   185        # positive types.  That shouldn't ever happen, unless
   186        # someone stressing the system on purpose.
   187        if len(_iscoroutine_typecache) < 100:
   188            _iscoroutine_typecache.add(type(obj))
   189        return True
   190    else:
   191        return False
   192
   193
   194def _format_coroutine(coro):
   195    assert iscoroutine(coro)
   196
   197    is_corowrapper = isinstance(coro, CoroWrapper)
   198
   199    def get_name(coro):
   200        # Coroutines compiled with Cython sometimes don't have
   201        # proper __qualname__ or __name__.  While that is a bug
   202        # in Cython, asyncio shouldn't crash with an AttributeError
   203        # in its __repr__ functions.
   204        if is_corowrapper:
   205            return format_helpers._format_callback(coro.func, (), {})
   206
   207        if hasattr(coro, '__qualname__') and coro.__qualname__:
   208            coro_name = coro.__qualname__
   209        elif hasattr(coro, '__name__') and coro.__name__:
   210            coro_name = coro.__name__
   211        else:
   212            # Stop masking Cython bugs, expose them in a friendly way.
   213            coro_name = f'<{type(coro).__name__} without __name__>'
   214        return f'{coro_name}()'
   215
   216    def is_running(coro):
   217        try:
   218            return coro.cr_running
   219        except AttributeError:
   220            try:
   221                return coro.gi_running
   222            except AttributeError:
   223                return False
   224
   225    coro_code = None
   226    if hasattr(coro, 'cr_code') and coro.cr_code:
   227        coro_code = coro.cr_code
   228    elif hasattr(coro, 'gi_code') and coro.gi_code:
   229        coro_code = coro.gi_code
   230
   231    coro_name = get_name(coro)
   232
   233    if not coro_code:
   234        # Built-in types might not have __qualname__ or __name__.
   235        if is_running(coro):
   236            return f'{coro_name} running'
   237        else:
   238            return coro_name
   239
   240    coro_frame = None
   241    if hasattr(coro, 'gi_frame') and coro.gi_frame:
   242        coro_frame = coro.gi_frame
   243    elif hasattr(coro, 'cr_frame') and coro.cr_frame:
   244        coro_frame = coro.cr_frame
   245
   246    # If Cython's coroutine has a fake code object without proper
   247    # co_filename -- expose that.
   248    filename = coro_code.co_filename or '<empty co_filename>'
   249
   250    lineno = 0
   251    if (is_corowrapper and
   252            coro.func is not None and
   253            not inspect.isgeneratorfunction(coro.func)):
   254        source = format_helpers._get_function_source(coro.func)
   255        if source is not None:
   256            filename, lineno = source
   257        if coro_frame is None:
   258            coro_repr = f'{coro_name} done, defined at {filename}:{lineno}'
   259        else:
   260            coro_repr = f'{coro_name} running, defined at {filename}:{lineno}'
   261
   262    elif coro_frame is not None:
   263        lineno = coro_frame.f_lineno
   264        coro_repr = f'{coro_name} running at {filename}:{lineno}'
   265
   266    else:
   267        lineno = coro_code.co_firstlineno
   268        coro_repr = f'{coro_name} done, defined at {filename}:{lineno}'
   269
   270    return coro_repr
   271

View as plain text