OLD | NEW |
1 """Support for tasks, coroutines and the scheduler.""" | 1 """Support for tasks, coroutines and the scheduler.""" |
2 | 2 |
3 __all__ = ['coroutine', 'Task', | 3 __all__ = ['coroutine', 'Task', |
4 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', | 4 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', |
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async', | 5 'wait', 'wait_for', 'as_completed', 'sleep', 'async', |
6 'gather', | 6 'gather', |
7 ] | 7 ] |
8 | 8 |
9 import collections | 9 import collections |
10 import concurrent.futures | 10 import concurrent.futures |
11 import functools | 11 import functools |
12 import inspect | 12 import inspect |
| 13 import linecache |
| 14 import traceback |
| 15 import weakref |
13 | 16 |
14 from . import events | 17 from . import events |
15 from . import futures | 18 from . import futures |
16 | 19 |
17 | 20 |
18 def coroutine(func): | 21 def coroutine(func): |
19 """Decorator to mark coroutines. | 22 """Decorator to mark coroutines. |
20 | 23 |
21 Decorator wraps non generator functions and returns generator wrapper. | 24 Decorator wraps non generator functions and returns generator wrapper. |
22 If non generator function returns generator of Future it yield-from it. | 25 If non generator function returns generator of Future it yield-from it. |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
55 | 58 |
56 # An important invariant maintained while a Task not done: | 59 # An important invariant maintained while a Task not done: |
57 # | 60 # |
58 # - Either _fut_waiter is None, and _step() is scheduled; | 61 # - Either _fut_waiter is None, and _step() is scheduled; |
59 # - or _fut_waiter is some Future, and _step() is *not* scheduled. | 62 # - or _fut_waiter is some Future, and _step() is *not* scheduled. |
60 # | 63 # |
61 # The only transition from the latter to the former is through | 64 # The only transition from the latter to the former is through |
62 # _wakeup(). When _fut_waiter is not None, one of its callbacks | 65 # _wakeup(). When _fut_waiter is not None, one of its callbacks |
63 # must be _wakeup(). | 66 # must be _wakeup(). |
64 | 67 |
| 68 # Weak set containing all tasks alive. |
| 69 _all_tasks = weakref.WeakSet() |
| 70 |
| 71 @classmethod |
| 72 def all_tasks(cls): |
| 73 """Return a set of all tasks in existence.""" |
| 74 return set(cls._all_tasks) |
| 75 |
| 76 @classmethod |
| 77 def all_pending_tasks(cls): |
| 78 """Return a set of all tasks in existence that aren't done yet.""" |
| 79 return {t for t in cls._all_tasks if not t.done()} |
| 80 |
| 81 @classmethod |
| 82 def all_done_tasks(cls): |
| 83 """Return a set of all tasks in existence that are done.""" |
| 84 return {t for t in cls._all_tasks if t.done()} |
| 85 |
| 86 @classmethod |
| 87 def all_failed_tasks(cls): |
| 88 """Return a set of all tasks in existence that have failed. |
| 89 |
| 90 This includes cancelled tasks and tasks with an exception set. |
| 91 """ |
| 92 return {t for t in cls._all_tasks |
| 93 if t.done() and (t.cancelled() or t.exception())} |
| 94 |
65 def __init__(self, coro, *, loop=None): | 95 def __init__(self, coro, *, loop=None): |
66 assert inspect.isgenerator(coro) # Must be a coroutine *object*. | 96 assert inspect.isgenerator(coro) # Must be a coroutine *object*. |
67 super().__init__(loop=loop) | 97 super().__init__(loop=loop) |
68 self._coro = coro | 98 self._coro = coro |
69 self._fut_waiter = None | 99 self._fut_waiter = None |
70 self._must_cancel = False | 100 self._must_cancel = False |
71 self._loop.call_soon(self._step) | 101 self._loop.call_soon(self._step) |
| 102 self.__class__._all_tasks.add(self) |
72 | 103 |
73 def __repr__(self): | 104 def __repr__(self): |
74 res = super().__repr__() | 105 res = super().__repr__() |
75 if (self._must_cancel and | 106 if (self._must_cancel and |
76 self._state == futures._PENDING and | 107 self._state == futures._PENDING and |
77 '<PENDING' in res): | 108 '<PENDING' in res): |
78 res = res.replace('<PENDING', '<CANCELLING', 1) | 109 res = res.replace('<PENDING', '<CANCELLING', 1) |
79 i = res.find('<') | 110 i = res.find('<') |
80 if i < 0: | 111 if i < 0: |
81 i = len(res) | 112 i = len(res) |
82 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:] | 113 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:] |
83 return res | 114 return res |
84 | 115 |
| 116 def get_stack(self): |
| 117 """Return the list of stack frames for this task's coroutine. |
| 118 |
| 119 If the coroutine is active, this returns the stack where it is |
| 120 suspended. If the coroutine has completed successfully or was |
| 121 cancelled, this returns an empty list. If the coroutine was |
| 122 terminated by an exception, this returns the list of traceback |
| 123 frames. |
| 124 """ |
| 125 frames = [] |
| 126 f = self._coro.gi_frame |
| 127 if f is not None: |
| 128 while f is not None: |
| 129 frames.append(f) |
| 130 f = f.f_back |
| 131 elif self._exception is not None: |
| 132 tb = self._exception.__traceback__ |
| 133 while tb is not None: |
| 134 frames.append(tb.tb_frame) |
| 135 tb = tb.tb_next |
| 136 frames.reverse() |
| 137 return frames |
| 138 |
| 139 def print_stack(self, limit=None, file=None): |
| 140 extracted_list = [] |
| 141 checked = set() |
| 142 for f in self.get_stack(): |
| 143 lineno = f.f_lineno |
| 144 co = f.f_code |
| 145 filename = co.co_filename |
| 146 name = co.co_name |
| 147 if filename not in checked: |
| 148 checked.add(filename) |
| 149 linecache.checkcache(filename) |
| 150 line = linecache.getline(filename, lineno, f.f_globals) |
| 151 extracted_list.append((filename, lineno, name, line)) |
| 152 if not extracted_list: |
| 153 print('No traceback for %r' % self, file=file) |
| 154 elif self._exception is not None: |
| 155 print('Traceback for %r (most recent call last):' % self, |
| 156 file=file) |
| 157 else: |
| 158 print('Stack for %r (most recent call last):' % self, |
| 159 file=file) |
| 160 traceback.print_list(extracted_list, file=file) |
| 161 |
85 def cancel(self): | 162 def cancel(self): |
86 if self.done(): | 163 if self.done(): |
87 return False | 164 return False |
88 if self._fut_waiter is not None: | 165 if self._fut_waiter is not None: |
89 if self._fut_waiter.cancel(): | 166 if self._fut_waiter.cancel(): |
90 # Leave self._fut_waiter; it may be a Task that | 167 # Leave self._fut_waiter; it may be a Task that |
91 # catches and ignores the cancellation so we may have | 168 # catches and ignores the cancellation so we may have |
92 # to cancel it again later. | 169 # to cancel it again later. |
93 return True | 170 return True |
94 # It must be the case that self._step is already scheduled. | 171 # It must be the case that self._step is already scheduled. |
(...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
463 outer.cancel() | 540 outer.cancel() |
464 else: | 541 else: |
465 exc = inner.exception() | 542 exc = inner.exception() |
466 if exc is not None: | 543 if exc is not None: |
467 outer.set_exception(exc) | 544 outer.set_exception(exc) |
468 else: | 545 else: |
469 outer.set_result(inner.result()) | 546 outer.set_result(inner.result()) |
470 | 547 |
471 inner.add_done_callback(_done_callback) | 548 inner.add_done_callback(_done_callback) |
472 return outer | 549 return outer |
OLD | NEW |