Skip to content

Commit c414c20

Browse files
committed
Uasyncio V2.0 code changes.
1 parent cf315bd commit c414c20

File tree

10 files changed

+155
-147
lines changed

10 files changed

+155
-147
lines changed

FASTPOLL.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,11 @@ priority, one with higher and one with lower priority than standard. It
238238
provides a replacement for `uasyncio.get_event_loop()` enabling the queue
239239
sizes to be set.
240240

241-
`aysncio_priority.get_event_loop(len, lpqlen)`
241+
`aysncio_priority.get_event_loop(runq_len, waitq_len, lpqlen)`
242242
Arguments:
243-
1. `len` Length of normal queue. Default 42 tasks.
244-
2. `lpqlen` Length of low priority queue. Default 42.
243+
1. `runq_len` Length of normal queue. Default 16 tasks.
244+
2. `waitq_len` Length of wait queue. Default 16.
245+
3. `lpqlen` Length of low priority queue. Default 16.
245246

246247
The low priority solution is based on the notion of "after" implying a time
247248
delay which can be expected to be less precise than the asyncio standard calls.
@@ -348,12 +349,11 @@ priority tasks which are pending execution.
348349
A simple demo of this is `benchmarks/call_lp.py`. Documentation is in the
349350
code.
350351

351-
`call_after_ms(delay, callback, args=())` Call with low priority. Positional
352+
`call_after_ms(delay, callback, *args)` Call with low priority. Positional
352353
args:
353354
1. `delay` Integer. Minimum delay in millisecs before callback runs.
354355
2. `callback` The callback to run.
355-
3. `args` Optional tuple or other iterable with positional args for the
356-
callback.
356+
3. `*args` Optional positional args for the callback.
357357

358358
###### [Jump to Contents](./FASTPOLL.md#contents)
359359

asyncio_priority.py

Lines changed: 113 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -28,43 +28,35 @@
2828

2929
import utime as time
3030
import utimeq
31+
import ucollections
3132
from uasyncio import *
3233

3334
class PriorityEventLoop(PollEventLoop):
34-
def __init__(self, len=42, lpqlen=42):
35-
super().__init__(len)
35+
def __init__(self, runq_len=16, waitq_len=16, lpqlen=42):
36+
super().__init__(runq_len, waitq_len)
3637
self._max_overdue_ms = 0
3738
self.lpq = utimeq.utimeq(lpqlen)
38-
self.hp_tasks = None
39-
self.create_task(self.lp_monitor())
40-
41-
# Monitor low priority tasks. If one can be scheduled, remove from LP queue
42-
# and queue it for scheduling.
43-
# If a normal task is ready we only queue an LP one which is due by more
44-
# than the max_overdue_ms threshold.
45-
# If no normal task is ready we queue the most overdue LP task.
46-
# Readiness is determined by whether it is actually due. An option would be
47-
# to check if it's due in less than N ms. This would reduce competition at
48-
# the cost of the application having to consider tight loops which pend for
49-
# < N ms.
50-
async def lp_monitor(self):
51-
this_task = [0, 0, 0]
52-
while True:
53-
if self.lpq:
54-
tnow = self.time()
55-
t = self.lpq.peektime()
56-
tim = time.ticks_diff(t, tnow)
57-
to_run = self._max_overdue_ms > 0 and tim < -self._max_overdue_ms
58-
if not to_run: # No overdue LP task. Are any normal tasks due?
59-
can_run = True # If q is empty can run an LP task
60-
if self.q:
61-
t = self.q.peektime()
62-
can_run = time.ticks_diff(t, tnow) > 0 # No normal task is ready -
63-
to_run = can_run and tim <= 0 # run if so and an LP one is ready
64-
if to_run:
65-
self.lpq.pop(this_task)
66-
self.q.push(*this_task)
67-
yield
39+
self.hp_tasks = []
40+
41+
# Schedule a single low priority task if one is ready or overdue.
42+
# The most overdue task is scheduled even if normal tasks are pending.
43+
# The most due task is scheduled only if no normal tasks are pending.
44+
def schedule_lp_task(self, cur_task, tnow):
45+
t = self.lpq.peektime()
46+
tim = time.ticks_diff(t, tnow)
47+
to_run = self._max_overdue_ms > 0 and tim < -self._max_overdue_ms
48+
if not to_run: # No overdue LP task.
49+
if len(self.runq):
50+
return False
51+
to_run = tim <= 0 # True if LP task is due
52+
if to_run and self.waitq: # Set False if a normal tasks is due.
53+
t = self.waitq.peektime()
54+
to_run = time.ticks_diff(t, tnow) > 0 # No normal task is ready
55+
if to_run:
56+
self.lpq.pop(cur_task)
57+
self.call_soon(cur_task[1], *cur_task[2])
58+
return True
59+
return False
6860

6961
def max_overdue_ms(self, t=None):
7062
if t is not None:
@@ -73,17 +65,16 @@ def max_overdue_ms(self, t=None):
7365

7466
# Low priority versions of call_later() call_later_ms() and call_at_()
7567
def call_after_ms(self, delay, callback, *args):
76-
self.call_at_lp_(time.ticks_add(self.time(), delay), callback, args)
68+
self.call_at_lp_(time.ticks_add(self.time(), delay), callback, *args)
69+
7770

7871
def call_after(self, delay, callback, *args):
79-
self.call_at_lp_(time.ticks_add(self.time(), int(delay * 1000)), callback, args)
72+
self.call_at_lp_(time.ticks_add(self.time(), int(delay * 1000)), callback, *args)
8073

81-
def call_at_lp_(self, time, callback, args=()):
74+
def call_at_lp_(self, time, callback, *args):
8275
self.lpq.push(time, callback, args)
8376

84-
def _schedule_hp(self, func, callback, args=()):
85-
if self.hp_tasks is None:
86-
self.hp_tasks = []
77+
def _schedule_hp(self, func, callback, *args):
8778
# If there's an empty slot, assign without allocation
8879
for entry in self.hp_tasks: # O(N) search - but N is typically 1 or 2...
8980
if not entry[0]:
@@ -97,80 +88,79 @@ def _schedule_hp(self, func, callback, args=()):
9788
def run_forever(self):
9889
cur_task = [0, 0, 0]
9990
while True:
100-
if self.q:
101-
# wait() may finish prematurely due to I/O completion,
102-
# and schedule new, earlier than before tasks to run.
103-
while 1:
104-
# Check list of high priority tasks
105-
if self.hp_tasks is not None:
106-
hp_found = False
107-
for entry in self.hp_tasks:
108-
if entry[0] and entry[0]():
109-
hp_found = True
110-
entry[0] = 0
111-
cur_task[0] = 0
112-
cur_task[1] = entry[1] # ??? quick non-allocating copy
113-
cur_task[2] = entry[2]
114-
break
115-
if hp_found:
116-
break
117-
118-
# Schedule any due normal task
119-
t = self.q.peektime()
120-
tnow = self.time()
91+
tnow = self.time()
92+
# Schedule a LP task if no normal task is ready
93+
l = len(self.lpq)
94+
if (l and not self.schedule_lp_task(cur_task, tnow)) or l == 0:
95+
# Expire entries in waitq and move them to runq
96+
while self.waitq:
97+
t = self.waitq.peektime()
12198
delay = time.ticks_diff(t, tnow)
122-
if delay <= 0:
123-
# Always call wait(), to give a chance to I/O scheduling
124-
self.wait(0)
125-
self.q.pop(cur_task)
99+
if delay > 0:
126100
break
101+
self.waitq.pop(cur_task)
102+
if __debug__ and DEBUG:
103+
log.debug("Moving from waitq to runq: %s", cur_task[1])
104+
self.call_soon(cur_task[1], *cur_task[2])
105+
106+
# Process runq
107+
l = len(self.runq)
108+
if __debug__ and DEBUG:
109+
log.debug("Entries in runq: %d", l)
110+
while l:
111+
# Check list of high priority tasks
112+
cb = None
113+
for entry in self.hp_tasks:
114+
if entry[0] and entry[0](): # Ready to run
115+
entry[0] = 0
116+
cb = entry[1]
117+
args = entry[2]
118+
break
119+
120+
if cb is None:
121+
cb = self.runq.popleft()
122+
l -= 1
123+
args = ()
124+
if not isinstance(cb, type_gen):
125+
args = self.runq.popleft()
126+
l -= 1
127+
if __debug__ and DEBUG:
128+
log.info("Next callback to run: %s", (cb, args))
129+
cb(*args)
130+
continue
127131

128-
self.wait(delay) # Handled in superclass
129-
t = cur_task[0]
130-
cb = cur_task[1]
131-
args = cur_task[2]
132132
if __debug__ and DEBUG:
133-
log.debug("Next coroutine to run: %s", (t, cb, args))
134-
# __main__.mem_info()
133+
log.info("Next coroutine to run: %s", (cb, args))
135134
self.cur_task = cb
136-
else:
137-
self.wait(-1)
138-
# Assuming IO completion scheduled some tasks
139-
continue
140-
if callable(cb):
141-
cb(*args)
142-
else:
143135
delay = 0
144136
func = None
145-
priority = True
137+
low_priority = False # Assume normal priority
146138
try:
147-
if __debug__ and DEBUG:
148-
log.debug("Coroutine %s send args: %s", cb, args)
149-
if args == ():
150-
ret = next(cb) # See notes at end of code
139+
if args is ():
140+
ret = next(cb)
151141
else:
152142
ret = cb.send(*args)
153143
if __debug__ and DEBUG:
154-
log.debug("Coroutine %s yield result: %s", cb, ret)
144+
log.info("Coroutine %s yield result: %s", cb, ret)
155145
if isinstance(ret, SysCall1):
156146
arg = ret.arg
157-
if isinstance(ret, After):
158-
delay = int(arg * 1000)
159-
priority = False
160-
elif isinstance(ret, AfterMs):
161-
delay = int(arg)
162-
priority = False
163-
elif isinstance(ret, When):
164-
if callable(arg):
165-
func = arg
166-
else:
167-
assert False, "Argument to 'when' must be a function or method."
168-
elif isinstance(ret, SleepMs):
147+
if isinstance(ret, SleepMs):
169148
delay = arg
149+
if isinstance(ret, AfterMs):
150+
low_priority = True
151+
if isinstance(ret, After):
152+
delay = int(delay*1000)
153+
elif isinstance(ret, When):
154+
if callable(arg):
155+
func = arg
156+
else:
157+
assert False, "Argument to 'when' must be a function or method."
170158
elif isinstance(ret, IORead):
159+
cb.pend_throw(False)
171160
self.add_reader(arg, cb)
172161
continue
173162
elif isinstance(ret, IOWrite):
163+
cb.pend_throw(False)
174164
self.add_writer(arg, cb)
175165
continue
176166
elif isinstance(ret, IOReadDone):
@@ -202,18 +192,37 @@ def run_forever(self):
202192
if __debug__ and DEBUG:
203193
log.debug("Coroutine cancelled: %s", cb)
204194
continue
205-
206195
if func is not None:
207196
self._schedule_hp(func, cb)
197+
continue
198+
# Currently all syscalls don't return anything, so we don't
199+
# need to feed anything to the next invocation of coroutine.
200+
# If that changes, need to pass that value below.
201+
if low_priority:
202+
self.call_after_ms(delay, cb)
203+
elif delay:
204+
self.call_later_ms(delay, cb)
208205
else:
209-
# Currently all syscalls don't return anything, so we don't
210-
# need to feed anything to the next invocation of coroutine.
211-
# If that changes, need to pass that value below.
212-
if priority:
213-
self.call_later_ms(delay, cb)
214-
else:
215-
self.call_after_ms(delay, cb)
206+
self.call_soon(cb)
216207

208+
# Wait until next waitq task or I/O availability
209+
delay = 0
210+
if not self.runq:
211+
delay = -1
212+
tnow = self.time()
213+
if self.waitq:
214+
t = self.waitq.peektime()
215+
delay = time.ticks_diff(t, tnow)
216+
if delay < 0:
217+
delay = 0
218+
if self.lpq:
219+
t = self.lpq.peektime()
220+
lpdelay = time.ticks_diff(t, tnow)
221+
if lpdelay < 0:
222+
lpdelay = 0
223+
if lpdelay < delay or delay < 0:
224+
delay = lpdelay
225+
self.wait(delay)
217226

218227
# Low priority
219228
class AfterMs(SleepMs):
@@ -232,7 +241,7 @@ class When(SleepMs):
232241

233242
import uasyncio.core
234243
uasyncio.core._event_loop_class = PriorityEventLoop
235-
def get_event_loop(len=42, lpqlen=42):
244+
def get_event_loop(runq_len=16, waitq_len=16, lpqlen=16):
236245
if uasyncio.core._event_loop is None: # Add a q entry for lp_monitor()
237-
uasyncio.core._event_loop = uasyncio.core._event_loop_class(len + 1, lpqlen)
246+
uasyncio.core._event_loop = uasyncio.core._event_loop_class(runq_len, waitq_len, lpqlen)
238247
return uasyncio.core._event_loop

asyntest.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,24 +152,20 @@ async def run_event_test(lp):
152152
loop.create_task(run_lock(3, lock))
153153
print('Test Event class')
154154
event = asyn.Event(lp)
155-
print('got here')
156155
loop.create_task(eventset(event))
157-
print('gh1')
158156
await eventwait(event) # run_event_test runs fast until this point
159157
print('Event status {}'.format('Incorrect' if event.is_set() else 'OK'))
160158
print('Tasks complete')
161159

162160
def event_test(lp=True): # Option to use low priority scheduling
163161
printexp('''Test Lock class
164162
Test Event class
165-
got here
166-
gh1
167163
waiting for event
168164
run_lock 1 waiting for lock
165+
run_lock 1 acquired lock
169166
run_lock 2 waiting for lock
170167
run_lock 3 waiting for lock
171168
Waiting 5 secs before setting event
172-
run_lock 1 acquired lock
173169
run_lock 1 released lock
174170
run_lock 2 acquired lock
175171
run_lock 2 released lock

benchmarks/latency.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,14 @@ def test(use_priority=True):
102102
else:
103103
ntasks = max(num_coros) + 10 #4
104104
if use_priority:
105-
loop = asyncio.get_event_loop(ntasks, ntasks)
105+
loop = asyncio.get_event_loop(ntasks, ntasks, ntasks)
106106
after = asyncio.after
107107
after_ms = asyncio.after_ms
108108
else:
109109
lp_version = False
110110
after = asyncio.sleep
111111
after_ms = asyncio.sleep_ms
112-
loop = asyncio.get_event_loop(ntasks)
112+
loop = asyncio.get_event_loop(ntasks, ntasks)
113113
s = 'Testing latency of priority task with coros blocking for {}ms.'
114114
print(s.format(processing_delay))
115115
if lp_version:

benchmarks/rate.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
# rate.py Benchmark for uasyncio. Author Peter Hinch April 2017.
1+
# rate.py Benchmark for uasyncio. Author Peter Hinch Feb 2018.
22
# Benchmark uasyncio round-robin scheduling performance
33
# This measures the rate at which uasyncio can schedule a minimal coro which
44
# mereley increments a global.
55

6-
# Outcome: minimal coros are scheduled at an interval of ~208us, independent of
7-
# the number of instances.
6+
# Outcome: minimal coros are scheduled at an interval of ~150us
87

98
import uasyncio as asyncio
109

@@ -40,7 +39,8 @@ async def test():
4039
iterations[n] = count
4140
done = True
4241

43-
loop = asyncio.get_event_loop(max(num_coros) + 2)
42+
ntasks = max(num_coros) + 2
43+
loop = asyncio.get_event_loop(ntasks, ntasks)
4444
loop.create_task(test())
4545
loop.run_until_complete(report())
4646

0 commit comments

Comments
 (0)