1# From CPython (Lib/asyncio/coroutines.py)
2__all__ = 'coroutine', 'iscoroutinefunction', 'iscoroutine'
3
4import collections.abc
5import functools
6import inspect
7import os
8import sys
9import traceback
10import types
11import warnings
12
13from . import base_futures
14from . import constants
15from . import format_helpers
16from .log import logger
17
18
19def _is_debug_mode():
20 # If you set _DEBUG to true, @coroutine will wrap the resulting
21 # generator objects in a CoroWrapper instance (defined below). That
22 # instance will log a message when the generator is never iterated
23 # over, which may happen when you forget to use "await" or "yield from"
24 # with a coroutine call.
25 # Note that the value of the _DEBUG flag is taken
26 # when the decorator is used, so to be of any use it must be set
27 # before you define your coroutines. A downside of using this feature
28 # is that tracebacks show entries for the CoroWrapper.__next__ method
29 # when _DEBUG is true.
30 return sys.flags.dev_mode or (not sys.flags.ignore_environment and
31 bool(os.environ.get('PYTHONASYNCIODEBUG')))
32
33
34_DEBUG = _is_debug_mode()
35
36
37class CoroWrapper:
38 # Wrapper for coroutine object in _DEBUG mode.
39
40 def __init__(self, gen, func=None):
41 assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen
42 self.gen = gen
43 self.func = func # Used to unwrap @coroutine decorator
44 self._source_traceback = format_helpers.extract_stack(sys._getframe(1))
45 self.__name__ = getattr(gen, '__name__', None)
46 self.__qualname__ = getattr(gen, '__qualname__', None)
47
48 def __repr__(self):
49 coro_repr = _format_coroutine(self)
50 if self._source_traceback:
51 frame = self._source_traceback[-1]
52 coro_repr += f', created at {frame[0]}:{frame[1]}'
53
54 return f'<{self.__class__.__name__} {coro_repr}>'
55
56 def __iter__(self):
57 return self
58
59 def __next__(self):
60 return self.gen.send(None)
61
62 def send(self, value):
63 return self.gen.send(value)
64
65 def throw(self, type, value=None, traceback=None):
66 return self.gen.throw(type, value, traceback)
67
68 def close(self):
69 return self.gen.close()
70
71 @property
72 def gi_frame(self):
73 return self.gen.gi_frame
74
75 @property
76 def gi_running(self):
77 return self.gen.gi_running
78
79 @property
80 def gi_code(self):
81 return self.gen.gi_code
82
83 def __await__(self):
84 return self
85
86 @property
87 def gi_yieldfrom(self):
88 return self.gen.gi_yieldfrom
89
90 def __del__(self):
91 # Be careful accessing self.gen.frame -- self.gen might not exist.
92 gen = getattr(self, 'gen', None)
93 frame = getattr(gen, 'gi_frame', None)
94 if frame is not None and frame.f_lasti == -1:
95 msg = f'{self!r} was never yielded from'
96 tb = getattr(self, '_source_traceback', ())
97 if tb:
98 tb = ''.join(traceback.format_list(tb))
99 msg += (f'\nCoroutine object created at '
100 f'(most recent call last, truncated to '
101 f'{constants.DEBUG_STACK_DEPTH} last lines):\n')
102 msg += tb.rstrip()
103 logger.error(msg)
104
105
106def coroutine(func):
107 """Decorator to mark coroutines.
108
109 If the coroutine is not yielded from before it is destroyed,
110 an error message is logged.
111 """
112 warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead',
113 DeprecationWarning,
114 stacklevel=2)
115 if inspect.iscoroutinefunction(func):
116 # In Python 3.5 that's all we need to do for coroutines
117 # defined with "async def".
118 return func
119
120 if inspect.isgeneratorfunction(func):
121 coro = func
122 else:
123 @functools.wraps(func)
124 def coro(*args, **kw):
125 res = func(*args, **kw)
126 if (base_futures.isfuture(res) or inspect.isgenerator(res) or
127 isinstance(res, CoroWrapper)):
128 res = yield from res
129 else:
130 # If 'res' is an awaitable, run it.
131 try:
132 await_meth = res.__await__
133 except AttributeError:
134 pass
135 else:
136 if isinstance(res, collections.abc.Awaitable):
137 res = yield from await_meth()
138 return res
139
140 coro = types.coroutine(coro)
141 if not _DEBUG:
142 wrapper = coro
143 else:
144 @functools.wraps(func)
145 def wrapper(*args, **kwds):
146 w = CoroWrapper(coro(*args, **kwds), func=func)
147 if w._source_traceback:
148 del w._source_traceback[-1]
149 # Python < 3.5 does not implement __qualname__
150 # on generator objects, so we set it manually.
151 # We use getattr as some callables (such as
152 # functools.partial may lack __qualname__).
153 w.__name__ = getattr(func, '__name__', None)
154 w.__qualname__ = getattr(func, '__qualname__', None)
155 return w
156
157 wrapper._is_coroutine = _is_coroutine # For iscoroutinefunction().
158 return wrapper
159
160
161# A marker for iscoroutinefunction.
162_is_coroutine = object()
163
164
165def iscoroutinefunction(func):
166 """Return True if func is a decorated coroutine function."""
167 return (inspect.iscoroutinefunction(func) or
168 getattr(func, '_is_coroutine', None) is _is_coroutine)
169
170
171# Prioritize native coroutine check to speed-up
172# asyncio.iscoroutine.
173_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType,
174 collections.abc.Coroutine, CoroWrapper)
175_iscoroutine_typecache = set()
176
177
178def iscoroutine(obj):
179 """Return True if obj is a coroutine object."""
180 if type(obj) in _iscoroutine_typecache:
181 return True
182
183 if isinstance(obj, _COROUTINE_TYPES):
184 # Just in case we don't want to cache more than 100
185 # positive types. That shouldn't ever happen, unless
186 # someone stressing the system on purpose.
187 if len(_iscoroutine_typecache) < 100:
188 _iscoroutine_typecache.add(type(obj))
189 return True
190 else:
191 return False
192
193
194def _format_coroutine(coro):
195 assert iscoroutine(coro)
196
197 is_corowrapper = isinstance(coro, CoroWrapper)
198
199 def get_name(coro):
200 # Coroutines compiled with Cython sometimes don't have
201 # proper __qualname__ or __name__. While that is a bug
202 # in Cython, asyncio shouldn't crash with an AttributeError
203 # in its __repr__ functions.
204 if is_corowrapper:
205 return format_helpers._format_callback(coro.func, (), {})
206
207 if hasattr(coro, '__qualname__') and coro.__qualname__:
208 coro_name = coro.__qualname__
209 elif hasattr(coro, '__name__') and coro.__name__:
210 coro_name = coro.__name__
211 else:
212 # Stop masking Cython bugs, expose them in a friendly way.
213 coro_name = f'<{type(coro).__name__} without __name__>'
214 return f'{coro_name}()'
215
216 def is_running(coro):
217 try:
218 return coro.cr_running
219 except AttributeError:
220 try:
221 return coro.gi_running
222 except AttributeError:
223 return False
224
225 coro_code = None
226 if hasattr(coro, 'cr_code') and coro.cr_code:
227 coro_code = coro.cr_code
228 elif hasattr(coro, 'gi_code') and coro.gi_code:
229 coro_code = coro.gi_code
230
231 coro_name = get_name(coro)
232
233 if not coro_code:
234 # Built-in types might not have __qualname__ or __name__.
235 if is_running(coro):
236 return f'{coro_name} running'
237 else:
238 return coro_name
239
240 coro_frame = None
241 if hasattr(coro, 'gi_frame') and coro.gi_frame:
242 coro_frame = coro.gi_frame
243 elif hasattr(coro, 'cr_frame') and coro.cr_frame:
244 coro_frame = coro.cr_frame
245
246 # If Cython's coroutine has a fake code object without proper
247 # co_filename -- expose that.
248 filename = coro_code.co_filename or '<empty co_filename>'
249
250 lineno = 0
251 if (is_corowrapper and
252 coro.func is not None and
253 not inspect.isgeneratorfunction(coro.func)):
254 source = format_helpers._get_function_source(coro.func)
255 if source is not None:
256 filename, lineno = source
257 if coro_frame is None:
258 coro_repr = f'{coro_name} done, defined at {filename}:{lineno}'
259 else:
260 coro_repr = f'{coro_name} running, defined at {filename}:{lineno}'
261
262 elif coro_frame is not None:
263 lineno = coro_frame.f_lineno
264 coro_repr = f'{coro_name} running at {filename}:{lineno}'
265
266 else:
267 lineno = coro_code.co_firstlineno
268 coro_repr = f'{coro_name} done, defined at {filename}:{lineno}'
269
270 return coro_repr
271
View as plain text