Skip to content

Commit ac44723

Browse files
committed
Reinstate low priority coros. Not yet documented.
1 parent 2709c56 commit ac44723

File tree

3 files changed

+70
-8
lines changed

3 files changed

+70
-8
lines changed

FASTPOLL.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ one line of code compared to an application running under the official version.
2828
The high priority mechanism formerly provided in `asyncio_priority.py` is
2929
replaced with a faster and more efficient way of handling asynchronous events
3030
with minimum latency. Consequently `asyncio_priority.py` is obsolete and should
31-
be deleted from your system. The facility for low priority coros is currently
32-
unavailable but will be reinstated.
31+
be deleted from your system.
32+
33+
The facility for low priority coros formerly provided by `asyncio_priority.py`
34+
exists but is not yet documented.
3335

3436
This modified version also provides for ultra low power consumption using a
3537
module documented [here](./lowpower/README.md).

fast_io/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ def set_debug(val):
1919
# to add_reader. Cand we fix this by maintaining two object maps?
2020
class PollEventLoop(EventLoop):
2121

22-
def __init__(self, runq_len=16, waitq_len=16, fast_io=0):
23-
EventLoop.__init__(self, runq_len, waitq_len, fast_io)
22+
def __init__(self, runq_len=16, waitq_len=16, fast_io=0, lp_len=0):
23+
EventLoop.__init__(self, runq_len, waitq_len, fast_io, lp_len)
2424
self.poller = select.poll()
2525
self.rdobjmap = {}
2626
self.wrobjmap = {}

fast_io/core.py

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ class TimeoutError(CancelledError):
3333

3434
class EventLoop:
3535

36-
def __init__(self, runq_len=16, waitq_len=16, ioq_len=0):
36+
def __init__(self, runq_len=16, waitq_len=16, ioq_len=0, lp_len=0):
3737
self.runq = ucollections.deque((), runq_len, True)
38+
self._max_overdue_ms = 0
39+
self.lpq = utimeq.utimeq(lp_len) if lp_len else None
3840
self.ioq_len = ioq_len
3941
if ioq_len:
4042
self.ioq = ucollections.deque((), ioq_len, True)
@@ -64,6 +66,24 @@ def _call_now(self, callback, *args): # For stream I/O only
6466
if not isinstance(callback, type_gen):
6567
self.ioq.append(args)
6668

69+
def max_overdue_ms(self, t=None):
70+
if t is not None:
71+
self._max_overdue_ms = int(t)
72+
return self._max_overdue_ms
73+
74+
# Low priority versions of call_later() call_later_ms() and call_at_()
75+
def call_after_ms(self, delay, callback, *args):
76+
self.call_at_lp_(time.ticks_add(self.time(), delay), callback, *args)
77+
78+
def call_after(self, delay, callback, *args):
79+
self.call_at_lp_(time.ticks_add(self.time(), int(delay * 1000)), callback, *args)
80+
81+
def call_at_lp_(self, time, callback, *args):
82+
if self.lpq is not None:
83+
self.lpq.push(time, callback, args)
84+
else:
85+
raise OSError('No low priority queue exists.')
86+
6787
def call_soon(self, callback, *args):
6888
if __debug__ and DEBUG:
6989
log.debug("Scheduling in runq: %s", (callback, args))
@@ -96,6 +116,22 @@ def run_forever(self):
96116
while True:
97117
# Expire entries in waitq and move them to runq
98118
tnow = self.time()
119+
if self.lpq:
120+
# Schedule a LP task if overdue or if no normal task is ready
121+
to_run = False # Assume no LP task is to run
122+
t = self.lpq.peektime()
123+
tim = time.ticks_diff(t, tnow)
124+
to_run = self._max_overdue_ms > 0 and tim < -self._max_overdue_ms
125+
if not (to_run or self.runq): # No overdue LP task or task on runq
126+
# zero delay tasks go straight to runq. So don't schedule LP if runq
127+
to_run = tim <= 0 # True if LP task is due
128+
if to_run and self.waitq: # Set False if normal tasks due.
129+
t = self.waitq.peektime()
130+
to_run = time.ticks_diff(t, tnow) > 0 # No normal task is ready
131+
if to_run:
132+
self.lpq.pop(cur_task)
133+
self.call_soon(cur_task[1], *cur_task[2])
134+
99135
while self.waitq:
100136
t = self.waitq.peektime()
101137
delay = time.ticks_diff(t, tnow)
@@ -139,6 +175,7 @@ def run_forever(self):
139175
log.info("Next coroutine to run: %s", (cb, args))
140176
self.cur_task = cb # Stored in a bound variable for TimeoutObj
141177
delay = 0
178+
low_priority = False # Assume normal priority
142179
try:
143180
if args is ():
144181
ret = next(cb) # Schedule the coro, get result
@@ -150,6 +187,10 @@ def run_forever(self):
150187
arg = ret.arg
151188
if isinstance(ret, SleepMs):
152189
delay = arg
190+
if isinstance(ret, AfterMs):
191+
low_priority = True
192+
if isinstance(ret, After):
193+
delay = int(delay*1000)
153194
elif isinstance(ret, IORead): # coro was a StreamReader read method
154195
cb.pend_throw(False) # Why? I think this is for debugging. If it is scheduled other than by wait
155196
# (which does pend_throw(None) an exception (exception doesn't inherit from Exception) is thrown
@@ -194,7 +235,9 @@ def run_forever(self):
194235
# Currently all syscalls don't return anything, so we don't
195236
# need to feed anything to the next invocation of coroutine.
196237
# If that changes, need to pass that value below.
197-
if delay:
238+
if low_priority:
239+
self.call_after_ms(delay, cb) # Put on lpq
240+
elif delay:
198241
self.call_later_ms(delay, cb)
199242
else:
200243
self.call_soon(cb)
@@ -209,6 +252,13 @@ def run_forever(self):
209252
delay = time.ticks_diff(t, tnow)
210253
if delay < 0:
211254
delay = 0
255+
if self.lpq:
256+
t = self.lpq.peektime()
257+
lpdelay = time.ticks_diff(t, tnow)
258+
if lpdelay < 0:
259+
lpdelay = 0
260+
if lpdelay < delay or delay < 0:
261+
delay = lpdelay # waitq is empty or lp task is more current
212262
self.wait(delay)
213263

214264
def run_until_complete(self, coro):
@@ -258,10 +308,10 @@ class IOWriteDone(SysCall1):
258308

259309
_event_loop = None
260310
_event_loop_class = EventLoop
261-
def get_event_loop(runq_len=16, waitq_len=16, ioq_len=0):
311+
def get_event_loop(runq_len=16, waitq_len=16, ioq_len=0, lp_len=0):
262312
global _event_loop
263313
if _event_loop is None:
264-
_event_loop = _event_loop_class(runq_len, waitq_len, ioq_len)
314+
_event_loop = _event_loop_class(runq_len, waitq_len, ioq_len, lp_len)
265315
return _event_loop
266316

267317
# Allow user classes to determine prior event loop instantiation.
@@ -342,6 +392,16 @@ def wait_for(coro, timeout):
342392
def coroutine(f):
343393
return f
344394

395+
# Low priority
396+
class AfterMs(SleepMs):
397+
pass
398+
399+
class After(AfterMs):
400+
pass
401+
402+
after_ms = AfterMs()
403+
after = After()
404+
345405
#
346406
# The functions below are deprecated in uasyncio, and provided only
347407
# for compatibility with CPython asyncio

0 commit comments

Comments
 (0)