Skip to content

Commit 071e719

Browse files
committed
uasyncio: restore uasyncio.__init__ and uasyncio.core from 25fb042
1 parent 1f47a13 commit 071e719

File tree

2 files changed

+232
-359
lines changed

2 files changed

+232
-359
lines changed

uasyncio.core/uasyncio/core.py

Lines changed: 89 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -1,176 +1,140 @@
1-
import utime as time
2-
import utimeq
3-
import ucollections
4-
5-
6-
type_gen = type((lambda: (yield))())
7-
8-
DEBUG = 0
9-
log = None
10-
11-
def set_debug(val):
12-
global DEBUG, log
13-
DEBUG = val
14-
if val:
15-
import logging
16-
log = logging.getLogger("uasyncio.core")
17-
1+
try:
2+
import utime as time
3+
except ImportError:
4+
import time
5+
import uheapq as heapq
6+
import logging
187

198
class CancelledError(Exception):
209
pass
2110

11+
log = logging.getLogger("asyncio")
2212

23-
class TimeoutError(CancelledError):
24-
pass
25-
13+
type_gen = type((lambda: (yield))())
2614

2715
class EventLoop:
2816

29-
def __init__(self, runq_len=16, waitq_len=16):
30-
self.runq = ucollections.deque((), runq_len, True)
31-
self.waitq = utimeq.utimeq(waitq_len)
32-
# Current task being run. Task is a top-level coroutine scheduled
33-
# in the event loop (sub-coroutines executed transparently by
34-
# yield from/await, event loop "doesn't see" them).
35-
self.cur_task = None
17+
def __init__(self):
18+
self.q = []
19+
self.cnt = 0
3620

3721
def time(self):
38-
return time.ticks_ms()
22+
return time.time()
3923

4024
def create_task(self, coro):
4125
# CPython 3.4.2
42-
self.call_later_ms(0, coro)
26+
self.call_at(0, coro)
4327
# CPython asyncio incompatibility: we don't return Task object
4428

4529
def call_soon(self, callback, *args):
46-
if __debug__ and DEBUG:
47-
log.debug("Scheduling in runq: %s", (callback, args))
48-
self.runq.append(callback)
49-
if not isinstance(callback, type_gen):
50-
self.runq.append(args)
30+
self.call_at(self.time(), callback, *args)
5131

5232
def call_later(self, delay, callback, *args):
53-
self.call_at_(time.ticks_add(self.time(), int(delay * 1000)), callback, args)
33+
self.call_at(self.time() + delay, callback, *args)
5434

55-
def call_later_ms(self, delay, callback, *args):
56-
if not delay:
57-
return self.call_soon(callback, *args)
58-
self.call_at_(time.ticks_add(self.time(), delay), callback, args)
59-
60-
def call_at_(self, time, callback, args=()):
61-
if __debug__ and DEBUG:
62-
log.debug("Scheduling in waitq: %s", (time, callback, args))
63-
self.waitq.push(time, callback, args)
35+
def call_at(self, time, callback, *args, exc=None):
36+
# Including self.cnt is a workaround per heapq docs
37+
if __debug__:
38+
log.debug("Scheduling %s", (time, self.cnt, callback, args, exc))
39+
heapq.heappush(self.q, (time, self.cnt, callback, args, exc, False))
40+
# print(self.q)
41+
self.cnt += 1
6442

6543
def wait(self, delay):
6644
# Default wait implementation, to be overriden in subclasses
6745
# with IO scheduling
68-
if __debug__ and DEBUG:
46+
if __debug__:
6947
log.debug("Sleeping for: %s", delay)
70-
time.sleep_ms(delay)
48+
time.sleep(delay)
49+
50+
def cancel(self, callback, exc = CancelledError):
51+
_id = id(callback)
52+
for idx, item in enumerate(self.q):
53+
t, cnt, cb, args, _exc = item
54+
if id(cb) != _id:
55+
continue
56+
if __debug__:
57+
log.debug("Setting discard flag on: %s at index %d", (t, cnt, cb, args, _exc), idx)
58+
self.q[idx] = t, cnt, cb, args, _exc, True
59+
self.call_at(0, cb, *args, exc=exc)
60+
self.remove_polled_cb(callback)
7161

7262
def run_forever(self):
73-
cur_task = [0, 0, 0]
7463
while True:
75-
# Expire entries in waitq and move them to runq
76-
tnow = self.time()
77-
while self.waitq:
78-
t = self.waitq.peektime()
79-
delay = time.ticks_diff(t, tnow)
80-
if delay > 0:
81-
break
82-
self.waitq.pop(cur_task)
83-
if __debug__ and DEBUG:
84-
log.debug("Moving from waitq to runq: %s", cur_task[1])
85-
self.call_soon(cur_task[1], *cur_task[2])
86-
87-
# Process runq
88-
l = len(self.runq)
89-
if __debug__ and DEBUG:
90-
log.debug("Entries in runq: %d", l)
91-
while l:
92-
cb = self.runq.popleft()
93-
l -= 1
94-
args = ()
95-
if not isinstance(cb, type_gen):
96-
args = self.runq.popleft()
97-
l -= 1
98-
if __debug__ and DEBUG:
99-
log.info("Next callback to run: %s", (cb, args))
100-
cb(*args)
64+
if self.q:
65+
tnow = self.time()
66+
if __debug__:
67+
log.debug('*'*20+' sched step start at %s, num tasks in queue %d', tnow, len(self.q))
68+
t, cnt, cb, args, exc, discard = heapq.heappop(self.q)
69+
delay = t - tnow
70+
if __debug__:
71+
log.debug("Next coroutine to run in %s: %s", delay, (t, cnt, cb, args, exc))
72+
if discard:
73+
if __debug__:
74+
log.debug("Discarding: %s", (t, cnt, cb, args, exc, discard))
10175
continue
102-
103-
if __debug__ and DEBUG:
104-
log.info("Next coroutine to run: %s", (cb, args))
105-
self.cur_task = cb
76+
# __main__.mem_info()
77+
if delay > 0 and not exc:
78+
self.call_at(t, cb, *args)
79+
self.wait(delay)
80+
continue
81+
else:
82+
self.wait(-1)
83+
# Assuming IO completion scheduled some tasks
84+
continue
85+
# cancelled callbacks aren't called and nor rescheduled
86+
if callable(cb):
87+
if not exc:
88+
cb(*args)
89+
else:
10690
delay = 0
10791
try:
108-
if args is ():
92+
if __debug__:
93+
log.debug("Coroutine %s send args: %s, %s", cb, args, exc)
94+
if exc:
95+
try:
96+
ret = cb.throw(exc)
97+
except exc:
98+
# ret == None reschedules a canceled task, next round it should raise StopIteration
99+
ret = None
100+
elif args == ():
109101
ret = next(cb)
110102
else:
111103
ret = cb.send(*args)
112-
if __debug__ and DEBUG:
113-
log.info("Coroutine %s yield result: %s", cb, ret)
104+
if __debug__:
105+
log.debug("Coroutine %s yield result: %s", cb, ret)
114106
if isinstance(ret, SysCall1):
115107
arg = ret.arg
116-
if isinstance(ret, SleepMs):
108+
if isinstance(ret, Sleep):
117109
delay = arg
118110
elif isinstance(ret, IORead):
119-
cb.pend_throw(False)
120-
self.add_reader(arg, cb)
111+
# self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj)
112+
# self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj)
113+
# self.add_reader(arg.fileno(), lambda cb: self.call_soon(cb), cb)
114+
self.add_reader(arg.fileno(), cb)
121115
continue
122116
elif isinstance(ret, IOWrite):
123-
cb.pend_throw(False)
124-
self.add_writer(arg, cb)
117+
# self.add_writer(arg.fileno(), lambda cb: self.call_soon(cb), cb)
118+
self.add_writer(arg.fileno(), cb)
125119
continue
126120
elif isinstance(ret, IOReadDone):
127-
self.remove_reader(arg)
121+
self.remove_reader(arg.fileno(), cb)
128122
elif isinstance(ret, IOWriteDone):
129-
self.remove_writer(arg)
123+
self.remove_writer(arg.fileno(), cb)
130124
elif isinstance(ret, StopLoop):
131125
return arg
132-
else:
133-
assert False, "Unknown syscall yielded: %r (of type %r)" % (ret, type(ret))
134126
elif isinstance(ret, type_gen):
135127
self.call_soon(ret)
136-
elif isinstance(ret, int):
137-
# Delay
138-
delay = ret
139128
elif ret is None:
140129
# Just reschedule
141130
pass
142-
elif ret is False:
143-
# Don't reschedule
144-
continue
145131
else:
146132
assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret))
147133
except StopIteration as e:
148-
if __debug__ and DEBUG:
134+
if __debug__:
149135
log.debug("Coroutine finished: %s", cb)
150136
continue
151-
except CancelledError as e:
152-
if __debug__ and DEBUG:
153-
log.debug("Coroutine cancelled: %s", cb)
154-
continue
155-
# Currently all syscalls don't return anything, so we don't
156-
# need to feed anything to the next invocation of coroutine.
157-
# If that changes, need to pass that value below.
158-
if delay:
159-
self.call_later_ms(delay, cb)
160-
else:
161-
self.call_soon(cb)
162-
163-
# Wait until next waitq task or I/O availability
164-
delay = 0
165-
if not self.runq:
166-
delay = -1
167-
if self.waitq:
168-
tnow = self.time()
169-
t = self.waitq.peektime()
170-
delay = time.ticks_diff(t, tnow)
171-
if delay < 0:
172-
delay = 0
173-
self.wait(delay)
137+
self.call_later(delay, cb, *args)
174138

175139
def run_until_complete(self, coro):
176140
def _run_and_stop():
@@ -179,9 +143,6 @@ def _run_and_stop():
179143
self.call_soon(_run_and_stop())
180144
self.run_forever()
181145

182-
def stop(self):
183-
self.call_soon((lambda: (yield StopLoop(0)))())
184-
185146
def close(self):
186147
pass
187148

@@ -200,6 +161,9 @@ class SysCall1(SysCall):
200161
def __init__(self, arg):
201162
self.arg = arg
202163

164+
class Sleep(SysCall1):
165+
pass
166+
203167
class StopLoop(SysCall1):
204168
pass
205169

@@ -218,82 +182,14 @@ class IOWriteDone(SysCall1):
218182

219183
_event_loop = None
220184
_event_loop_class = EventLoop
221-
def get_event_loop(runq_len=16, waitq_len=16):
185+
def get_event_loop():
222186
global _event_loop
223187
if _event_loop is None:
224-
_event_loop = _event_loop_class(runq_len, waitq_len)
188+
_event_loop = _event_loop_class()
225189
return _event_loop
226190

227191
def sleep(secs):
228-
yield int(secs * 1000)
229-
230-
# Implementation of sleep_ms awaitable with zero heap memory usage
231-
class SleepMs(SysCall1):
232-
233-
def __init__(self):
234-
self.v = None
235-
self.arg = None
236-
237-
def __call__(self, arg):
238-
self.v = arg
239-
#print("__call__")
240-
return self
241-
242-
def __iter__(self):
243-
#print("__iter__")
244-
return self
245-
246-
def __next__(self):
247-
if self.v is not None:
248-
#print("__next__ syscall enter")
249-
self.arg = self.v
250-
self.v = None
251-
return self
252-
#print("__next__ syscall exit")
253-
_stop_iter.__traceback__ = None
254-
raise _stop_iter
255-
256-
_stop_iter = StopIteration()
257-
sleep_ms = SleepMs()
258-
259-
260-
def cancel(coro):
261-
prev = coro.pend_throw(CancelledError())
262-
if prev is False:
263-
_event_loop.call_soon(coro)
264-
265-
266-
class TimeoutObj:
267-
def __init__(self, coro):
268-
self.coro = coro
269-
270-
271-
def wait_for_ms(coro, timeout):
272-
273-
def waiter(coro, timeout_obj):
274-
res = yield from coro
275-
if __debug__ and DEBUG:
276-
log.debug("waiter: cancelling %s", timeout_obj)
277-
timeout_obj.coro = None
278-
return res
279-
280-
def timeout_func(timeout_obj):
281-
if timeout_obj.coro:
282-
if __debug__ and DEBUG:
283-
log.debug("timeout_func: cancelling %s", timeout_obj.coro)
284-
prev = timeout_obj.coro.pend_throw(TimeoutError())
285-
#print("prev pend", prev)
286-
if prev is False:
287-
_event_loop.call_soon(timeout_obj.coro)
288-
289-
timeout_obj = TimeoutObj(_event_loop.cur_task)
290-
_event_loop.call_later_ms(timeout, timeout_func, timeout_obj)
291-
return (yield from waiter(coro, timeout_obj))
292-
293-
294-
def wait_for(coro, timeout):
295-
return wait_for_ms(coro, int(timeout * 1000))
296-
192+
yield Sleep(secs)
297193

298194
def coroutine(f):
299195
return f

0 commit comments

Comments
 (0)