Skip to content

Commit a888396

Browse files
committed
Use newer event loop implementation (based on asi2700-master branch)
1 parent 04d215f commit a888396

File tree

2 files changed

+277
-133
lines changed

2 files changed

+277
-133
lines changed

uasyncio.core/uasyncio/core.py

Lines changed: 214 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,120 +1,148 @@
1-
try:
2-
import utime as time
3-
except ImportError:
4-
import time
5-
import uheapq as heapq
6-
import logging
1+
import utime as time
2+
import utimeq
3+
import ucollections
4+
5+
6+
type_gen = type((lambda: (yield))())
7+
8+
DEBUG = 0
9+
log = None
10+
11+
def set_debug(val):
12+
global DEBUG, log
13+
DEBUG = val
14+
if val:
15+
import logging
16+
log = logging.getLogger("uasyncio.core")
17+
log.setLevel(val)
18+
719

820
class CancelledError(Exception):
921
pass
1022

11-
log = logging.getLogger("asyncio")
1223

13-
type_gen = type((lambda: (yield))())
24+
class TimeoutError(CancelledError):
25+
pass
26+
1427

1528
class EventLoop:
1629

17-
def __init__(self):
18-
self.q = []
19-
self.cnt = 0
30+
def __init__(self, runq_len=16, waitq_len=16):
31+
self.runq = ucollections.deque((), runq_len, True)
32+
self.waitq = utimeq.utimeq(waitq_len)
33+
# Current task being run. Task is a top-level coroutine scheduled
34+
# in the event loop (sub-coroutines executed transparently by
35+
# yield from/await, event loop "doesn't see" them).
36+
self.cur_task = None
2037

2138
def time(self):
22-
return time.time()
39+
return time.ticks_ms()
2340

2441
def create_task(self, coro):
2542
# CPython 3.4.2
26-
self.call_at(0, coro)
43+
self.call_later_ms(0, coro)
2744
# CPython asyncio incompatibility: we don't return Task object
2845

2946
def call_soon(self, callback, *args):
30-
self.call_at(self.time(), callback, *args)
47+
if __debug__ and DEBUG:
48+
log.debug("Scheduling in runq: %s", (callback, args))
49+
self.runq.append(callback)
50+
if not isinstance(callback, type_gen):
51+
self.runq.append(args)
3152

3253
def call_later(self, delay, callback, *args):
33-
self.call_at(self.time() + delay, callback, *args)
54+
self.call_at_(time.ticks_add(self.time(), int(delay * 1000)), callback, args)
3455

35-
def call_at(self, time, callback, *args, exc=None):
36-
# Including self.cnt is a workaround per heapq docs
37-
if __debug__:
38-
log.debug("Scheduling %s", (time, self.cnt, callback, args, exc))
39-
heapq.heappush(self.q, (time, self.cnt, callback, args, exc, False))
40-
# print(self.q)
41-
self.cnt += 1
56+
def call_later_ms(self, delay, callback, *args):
57+
if not delay:
58+
return self.call_soon(callback, *args)
59+
self.call_at_(time.ticks_add(self.time(), delay), callback, args)
60+
61+
def call_at_(self, time, callback, args=()):
62+
if __debug__ and DEBUG:
63+
log.debug("Scheduling in waitq: %s", (time, callback, args))
64+
self.waitq.push(time, callback, args)
4265

4366
def wait(self, delay):
4467
# Default wait implementation, to be overriden in subclasses
4568
# with IO scheduling
46-
if __debug__:
69+
if __debug__ and DEBUG:
4770
log.debug("Sleeping for: %s", delay)
48-
time.sleep(delay)
49-
50-
def cancel(self, callback, exc = CancelledError):
51-
_id = id(callback)
52-
for idx, item in enumerate(self.q):
53-
t, cnt, cb, args, _exc, _discard = item
54-
if id(cb) != _id:
55-
continue
56-
if __debug__:
57-
log.debug("Setting discard flag on: %s at index %d", (t, cnt, cb, args, _exc), idx)
58-
self.q[idx] = t, cnt, cb, args, _exc, True
59-
self.call_at(0, cb, *args, exc=exc)
60-
self.remove_polled_cb(callback)
71+
time.sleep_ms(delay)
72+
73+
def cancel(self, coro, exc = CancelledError):
74+
if isinstance(coro, type_gen):
75+
try:
76+
prev = coro.pend_throw(exc)
77+
if __debug__ and DEBUG:
78+
log.debug("Cancelling %s asynchronously", coro)
79+
if prev is False:
80+
_event_loop.remove_polled_cb(coro)
81+
_event_loop.call_soon(coro)
82+
except TypeError:
83+
# .pend_throw() works only on started coroutines
84+
# Kill the coro right here, right now
85+
# No need to worry about IO because the coro cannot be registered
86+
# because it didn't get a chance to run yet
87+
try:
88+
if __debug__ and DEBUG:
89+
log.debug("Cancelling %s synchronously", coro)
90+
coro.throw(exc)
91+
except exc:
92+
pass
93+
else:
94+
raise UnimplementedError('Cancelling a callback is not supported')
6195

6296
def run_forever(self):
97+
cur_task = [0, 0, 0]
6398
while True:
64-
if self.q:
65-
tnow = self.time()
66-
if __debug__:
67-
log.debug('*'*20+' sched step start at %s, num tasks in queue %d', tnow, len(self.q))
68-
t, cnt, cb, args, exc, discard = heapq.heappop(self.q)
69-
delay = t - tnow
70-
if __debug__:
71-
log.debug("Next coroutine to run in %s: %s", delay, (t, cnt, cb, args, exc))
72-
if discard:
73-
if __debug__:
74-
log.debug("Discarding: %s", (t, cnt, cb, args, exc, discard))
75-
continue
76-
# __main__.mem_info()
77-
if delay > 0 and not exc:
78-
self.call_at(t, cb, *args)
79-
self.wait(delay)
80-
continue
81-
else:
82-
self.wait(-1)
83-
# Assuming IO completion scheduled some tasks
84-
continue
85-
# cancelled callbacks aren't called and nor rescheduled
86-
if callable(cb):
87-
if not exc:
99+
# Expire entries in waitq and move them to runq
100+
tnow = self.time()
101+
while self.waitq:
102+
t = self.waitq.peektime()
103+
delay = time.ticks_diff(t, tnow)
104+
if delay > 0:
105+
break
106+
self.waitq.pop(cur_task)
107+
if __debug__ and DEBUG:
108+
log.debug("Moving from waitq to runq: %s", cur_task[1])
109+
self.call_soon(cur_task[1], *cur_task[2])
110+
111+
# Process runq
112+
l = len(self.runq)
113+
if __debug__ and DEBUG:
114+
log.debug("Entries in runq: %d", l)
115+
while l:
116+
cb = self.runq.popleft()
117+
l -= 1
118+
args = ()
119+
if not isinstance(cb, type_gen):
120+
args = self.runq.popleft()
121+
l -= 1
122+
if __debug__ and DEBUG:
123+
log.info("Next callback to run: %s", (cb, args))
88124
cb(*args)
89-
else:
125+
continue
126+
127+
if __debug__ and DEBUG:
128+
log.info("Next coroutine to run: %s", (cb, args))
129+
self.cur_task = cb
90130
delay = 0
91131
try:
92-
if __debug__:
93-
log.debug("Coroutine %s send args: %s, %s", cb, args, exc)
94-
if exc:
95-
try:
96-
ret = cb.throw(exc)
97-
except exc:
98-
# ret == None reschedules a canceled task, next round it should raise StopIteration
99-
ret = None
100-
elif args == ():
132+
if args is ():
101133
ret = next(cb)
102134
else:
103135
ret = cb.send(*args)
104-
if __debug__:
105-
log.debug("Coroutine %s yield result: %s", cb, ret)
136+
if __debug__ and DEBUG:
137+
log.info("Coroutine %s yield result: %s", cb, ret)
106138
if isinstance(ret, SysCall1):
107139
arg = ret.arg
108-
if isinstance(ret, Sleep):
140+
if isinstance(ret, SleepMs):
109141
delay = arg
110142
elif isinstance(ret, IORead):
111-
# self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj)
112-
# self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj)
113-
# self.add_reader(arg.fileno(), lambda cb: self.call_soon(cb), cb)
114143
self.add_reader(arg.fileno(), cb)
115144
continue
116145
elif isinstance(ret, IOWrite):
117-
# self.add_writer(arg.fileno(), lambda cb: self.call_soon(cb), cb)
118146
self.add_writer(arg.fileno(), cb)
119147
continue
120148
elif isinstance(ret, IOReadDone):
@@ -123,18 +151,56 @@ def run_forever(self):
123151
self.remove_writer(arg.fileno(), cb)
124152
elif isinstance(ret, StopLoop):
125153
return arg
154+
else:
155+
assert False, "Unknown syscall yielded: %r (of type %r)" % (ret, type(ret))
126156
elif isinstance(ret, type_gen):
127157
self.call_soon(ret)
158+
elif isinstance(ret, int):
159+
# Delay
160+
delay = ret
128161
elif ret is None:
129162
# Just reschedule
130163
pass
164+
elif ret is False:
165+
# Don't reschedule
166+
continue
131167
else:
132168
assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret))
133169
except StopIteration as e:
134-
if __debug__:
170+
if __debug__ and DEBUG:
135171
log.debug("Coroutine finished: %s", cb)
172+
self.remove_polled_cb(cb)
173+
continue
174+
except CancelledError as e:
175+
if __debug__ and DEBUG:
176+
log.debug("Coroutine cancelled: %s", cb)
177+
self.remove_polled_cb(cb)
136178
continue
137-
self.call_later(delay, cb, *args)
179+
except Exception as e:
180+
if log:
181+
log.error("Coroutine %s exception: %s", cb, e)
182+
# Currently all syscalls don't return anything, so we don't
183+
# need to feed anything to the next invocation of coroutine.
184+
# If that changes, need to pass that value below.
185+
if delay:
186+
self.call_later_ms(delay, cb)
187+
else:
188+
self.call_soon(cb)
189+
190+
# Wait until next waitq task or I/O availability
191+
delay = 0
192+
if not self.runq:
193+
if self.waitq:
194+
tnow = self.time()
195+
t = self.waitq.peektime()
196+
delay = time.ticks_diff(t, tnow)
197+
if delay < 0:
198+
delay = 0
199+
else:
200+
if __debug__ and DEBUG:
201+
log.info("No more tasks to execute, waiting forever")
202+
delay = -1
203+
self.wait(delay)
138204

139205
def run_until_complete(self, coro):
140206
def _run_and_stop():
@@ -143,6 +209,9 @@ def _run_and_stop():
143209
self.call_soon(_run_and_stop())
144210
self.run_forever()
145211

212+
def stop(self):
213+
self.call_soon((lambda: (yield StopLoop(0)))())
214+
146215
def close(self):
147216
pass
148217

@@ -161,9 +230,6 @@ class SysCall1(SysCall):
161230
def __init__(self, arg):
162231
self.arg = arg
163232

164-
class Sleep(SysCall1):
165-
pass
166-
167233
class StopLoop(SysCall1):
168234
pass
169235

@@ -182,14 +248,77 @@ class IOWriteDone(SysCall1):
182248

183249
_event_loop = None
184250
_event_loop_class = EventLoop
185-
def get_event_loop():
251+
def get_event_loop(runq_len=16, waitq_len=16):
186252
global _event_loop
187253
if _event_loop is None:
188-
_event_loop = _event_loop_class()
254+
_event_loop = _event_loop_class(runq_len, waitq_len)
189255
return _event_loop
190256

191257
def sleep(secs):
192-
yield Sleep(secs)
258+
yield int(secs * 1000)
259+
260+
# Implementation of sleep_ms awaitable with zero heap memory usage
261+
class SleepMs(SysCall1):
262+
263+
def __init__(self):
264+
self.v = None
265+
self.arg = None
266+
267+
def __call__(self, arg):
268+
self.v = arg
269+
#print("__call__")
270+
return self
271+
272+
def __iter__(self):
273+
#print("__iter__")
274+
return self
275+
276+
def __next__(self):
277+
if self.v is not None:
278+
#print("__next__ syscall enter")
279+
self.arg = self.v
280+
self.v = None
281+
return self
282+
#print("__next__ syscall exit")
283+
_stop_iter.__traceback__ = None
284+
raise _stop_iter
285+
286+
_stop_iter = StopIteration()
287+
sleep_ms = SleepMs()
288+
289+
290+
def cancel(coro, exc=CancelledError):
291+
_event_loop.cancel(coro, exc=exc)
292+
293+
294+
class TimeoutObj:
295+
def __init__(self, coro):
296+
self.coro = coro
297+
298+
299+
def wait_for_ms(coro, timeout):
300+
301+
def waiter(coro, timeout_obj):
302+
res = yield from coro
303+
if __debug__ and DEBUG:
304+
log.debug("waiter: cancelling %s", timeout_obj)
305+
timeout_obj.coro = None
306+
return res
307+
308+
def timeout_func(timeout_obj):
309+
if timeout_obj.coro:
310+
if __debug__ and DEBUG:
311+
log.debug("timeout_func: cancelling %s", timeout_obj.coro)
312+
cancel(timeout_obj.coro, exc=TimeoutError())
313+
314+
timeout_obj = TimeoutObj(_event_loop.cur_task)
315+
_event_loop.call_later_ms(timeout, timeout_func, timeout_obj)
316+
return (yield from waiter(coro, timeout_obj))
317+
318+
319+
def wait_for(coro, timeout):
320+
return wait_for_ms(coro, int(timeout * 1000))
321+
193322

194323
def coroutine(f):
195324
return f

0 commit comments

Comments
 (0)