Skip to content

Commit 67bf35f

Browse files
committed
V0.24 Fast response to cancellation and timeout.
1 parent 96c501e commit 67bf35f

File tree

3 files changed

+350
-0
lines changed

3 files changed

+350
-0
lines changed

fast_io/fast_can_test.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# fast_can_test.py Test of cancellation of tasks which call sleep
2+
3+
# Copyright (c) Peter Hinch 2019
4+
# Released under the MIT licence
5+
6+
import uasyncio as asyncio
7+
import sys
8+
ermsg = 'This test requires the fast_io version of uasyncio V2.4 or later.'
9+
try:
10+
print('Uasyncio version', asyncio.version)
11+
if not isinstance(asyncio.version, tuple):
12+
print(ermsg)
13+
sys.exit(0)
14+
except AttributeError:
15+
print(ermsg)
16+
sys.exit(0)
17+
18+
# If a task times out the TimeoutError can't be trapped:
19+
# no exception is thrown to the task
20+
21+
async def foo(t):
22+
try:
23+
print('foo started')
24+
await asyncio.sleep(t)
25+
print('foo ended', t)
26+
except asyncio.CancelledError:
27+
print('foo cancelled', t)
28+
29+
async def lpfoo(t):
30+
try:
31+
print('lpfoo started')
32+
await asyncio.after(t)
33+
print('lpfoo ended', t)
34+
except asyncio.CancelledError:
35+
print('lpfoo cancelled', t)
36+
37+
async def run(coro, t):
38+
await asyncio.wait_for(coro, t)
39+
40+
async def bar(loop):
41+
foo1 = foo(1)
42+
foo5 = foo(5)
43+
lpfoo1 = lpfoo(1)
44+
lpfoo5 = lpfoo(5)
45+
loop.create_task(foo1)
46+
loop.create_task(foo5)
47+
loop.create_task(lpfoo1)
48+
loop.create_task(lpfoo5)
49+
await asyncio.sleep(2)
50+
print('Cancelling tasks')
51+
asyncio.cancel(foo1)
52+
asyncio.cancel(foo5)
53+
asyncio.cancel(lpfoo1)
54+
asyncio.cancel(lpfoo5)
55+
await asyncio.sleep(0) # Allow cancellation to occur
56+
print('Pausing 7s to ensure no task still running.')
57+
await asyncio.sleep(7)
58+
print('Launching tasks with 2s timeout')
59+
loop.create_task(run(foo(1), 2))
60+
loop.create_task(run(lpfoo(1), 2))
61+
loop.create_task(run(foo(20), 2))
62+
loop.create_task(run(lpfoo(20), 2))
63+
print('Pausing 7s to ensure no task still running.')
64+
await asyncio.sleep(7)
65+
66+
loop = asyncio.get_event_loop(ioq_len=16, lp_len=16)
67+
loop.run_until_complete(bar(loop))

fast_io/iorw_can.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# iorw_can.py Emulate a device which can read and write one character at a time
2+
# and test cancellation.
3+
4+
# Copyright (c) Peter Hinch 2019
5+
# Released under the MIT licence
6+
7+
# This requires the modified version of uasyncio (fast_io directory).
8+
# Slow hardware is emulated using timers.
9+
# MyIO.write() ouputs a single character and sets the hardware not ready.
10+
# MyIO.readline() returns a single character and sets the hardware not ready.
11+
# Timers asynchronously set the hardware ready.
12+
13+
import io, pyb
14+
import uasyncio as asyncio
15+
import micropython
16+
import sys
17+
try:
18+
print('Uasyncio version', asyncio.version)
19+
if not isinstance(asyncio.version, tuple):
20+
print('Please use fast_io version 0.24 or later.')
21+
sys.exit(0)
22+
except AttributeError:
23+
print('ERROR: This test requires the fast_io version. It will not run correctly')
24+
print('under official uasyncio V2.0 owing to a bug which prevents concurrent')
25+
print('input and output.')
26+
sys.exit(0)
27+
28+
print('Issue iorw_can.test(True) to test ioq, iorw_can.test() to test runq.')
29+
print('Tasks time out after 15s.')
30+
print('Issue ctrl-d after each run.')
31+
32+
micropython.alloc_emergency_exception_buf(100)
33+
34+
MP_STREAM_POLL_RD = const(1)
35+
MP_STREAM_POLL_WR = const(4)
36+
MP_STREAM_POLL = const(3)
37+
MP_STREAM_ERROR = const(-1)
38+
39+
def printbuf(this_io):
40+
print(bytes(this_io.wbuf[:this_io.wprint_len]).decode(), end='')
41+
42+
class MyIO(io.IOBase):
43+
def __init__(self, read=False, write=False):
44+
self.ready_rd = False # Read and write not ready
45+
self.rbuf = b'ready\n' # Read buffer
46+
self.ridx = 0
47+
pyb.Timer(4, freq = 5, callback = self.do_input)
48+
self.wch = b''
49+
self.wbuf = bytearray(100) # Write buffer
50+
self.wprint_len = 0
51+
self.widx = 0
52+
pyb.Timer(5, freq = 10, callback = self.do_output)
53+
54+
# Read callback: emulate asynchronous input from hardware.
55+
# Typically would put bytes into a ring buffer and set .ready_rd.
56+
def do_input(self, t):
57+
self.ready_rd = True # Data is ready to read
58+
59+
# Write timer callback. Emulate hardware: if there's data in the buffer
60+
# write some or all of it
61+
def do_output(self, t):
62+
if self.wch:
63+
self.wbuf[self.widx] = self.wch
64+
self.widx += 1
65+
if self.wch == ord('\n'):
66+
self.wprint_len = self.widx # Save for schedule
67+
micropython.schedule(printbuf, self)
68+
self.widx = 0
69+
self.wch = b''
70+
71+
72+
def ioctl(self, req, arg): # see ports/stm32/uart.c
73+
ret = MP_STREAM_ERROR
74+
if req == MP_STREAM_POLL:
75+
ret = 0
76+
if arg & MP_STREAM_POLL_RD:
77+
if self.ready_rd:
78+
ret |= MP_STREAM_POLL_RD
79+
if arg & MP_STREAM_POLL_WR:
80+
if not self.wch:
81+
ret |= MP_STREAM_POLL_WR # Ready if no char pending
82+
return ret
83+
84+
# Test of device that produces one character at a time
85+
def readline(self):
86+
self.ready_rd = False # Cleared by timer cb do_input
87+
ch = self.rbuf[self.ridx]
88+
if ch == ord('\n'):
89+
self.ridx = 0
90+
else:
91+
self.ridx += 1
92+
return chr(ch)
93+
94+
# Emulate unbuffered hardware which writes one character: uasyncio waits
95+
# until hardware is ready for the next. Hardware ready is emulated by write
96+
# timer callback.
97+
def write(self, buf, off, sz):
98+
self.wch = buf[off] # Hardware starts to write a char
99+
return 1 # 1 byte written. uasyncio waits on ioctl write ready
100+
101+
# Note that trapping the exception and returning is still mandatory.
102+
async def receiver(myior):
103+
sreader = asyncio.StreamReader(myior)
104+
try:
105+
while True:
106+
res = await sreader.readline()
107+
print('Received', res)
108+
except asyncio.CancelledError:
109+
print('Receiver cancelled')
110+
111+
async def sender(myiow):
112+
swriter = asyncio.StreamWriter(myiow, {})
113+
await asyncio.sleep(1)
114+
count = 0
115+
try: # Trap in outermost scope to catch cancellation of .sleep
116+
while True:
117+
count += 1
118+
tosend = 'Wrote Hello MyIO {}\n'.format(count)
119+
await swriter.awrite(tosend.encode('UTF8'))
120+
await asyncio.sleep(2)
121+
except asyncio.CancelledError:
122+
print('Sender cancelled')
123+
124+
async def cannem(coros, t):
125+
await asyncio.sleep(t)
126+
for coro in coros:
127+
asyncio.cancel(coro)
128+
await asyncio.sleep(1)
129+
130+
def test(ioq=False):
131+
myio = MyIO()
132+
if ioq:
133+
loop = asyncio.get_event_loop(ioq_len=16)
134+
else:
135+
loop = asyncio.get_event_loop()
136+
rx = receiver(myio)
137+
tx = sender(myio)
138+
loop.create_task(rx)
139+
loop.create_task(tx)
140+
loop.run_until_complete(cannem((rx, tx), 15))

fast_io/iorw_to.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# iorw_to.py Emulate a device which can read and write one character at a time
2+
# and test timeouts.
3+
4+
# Copyright (c) Peter Hinch 2019
5+
# Released under the MIT licence
6+
7+
# This requires the modified version of uasyncio (fast_io directory).
8+
# Slow hardware is emulated using timers.
9+
# MyIO.write() ouputs a single character and sets the hardware not ready.
10+
# MyIO.readline() returns a single character and sets the hardware not ready.
11+
# Timers asynchronously set the hardware ready.
12+
13+
import io, pyb
14+
import uasyncio as asyncio
15+
import micropython
16+
import sys
17+
try:
18+
print('Uasyncio version', asyncio.version)
19+
if not isinstance(asyncio.version, tuple):
20+
print('Please use fast_io version 0.24 or later.')
21+
sys.exit(0)
22+
except AttributeError:
23+
print('ERROR: This test requires the fast_io version. It will not run correctly')
24+
print('under official uasyncio V2.0 owing to a bug which prevents concurrent')
25+
print('input and output.')
26+
sys.exit(0)
27+
28+
print('Issue iorw_to.test(True) to test ioq, iorw_to.test() to test runq.')
29+
print('Test runs until interrupted. Tasks time out after 15s.')
30+
print('Issue ctrl-d after each run.')
31+
32+
micropython.alloc_emergency_exception_buf(100)
33+
34+
MP_STREAM_POLL_RD = const(1)
35+
MP_STREAM_POLL_WR = const(4)
36+
MP_STREAM_POLL = const(3)
37+
MP_STREAM_ERROR = const(-1)
38+
39+
def printbuf(this_io):
40+
print(bytes(this_io.wbuf[:this_io.wprint_len]).decode(), end='')
41+
42+
class MyIO(io.IOBase):
43+
def __init__(self, read=False, write=False):
44+
self.ready_rd = False # Read and write not ready
45+
self.rbuf = b'ready\n' # Read buffer
46+
self.ridx = 0
47+
pyb.Timer(4, freq = 5, callback = self.do_input)
48+
self.wch = b''
49+
self.wbuf = bytearray(100) # Write buffer
50+
self.wprint_len = 0
51+
self.widx = 0
52+
pyb.Timer(5, freq = 10, callback = self.do_output)
53+
54+
# Read callback: emulate asynchronous input from hardware.
55+
# Typically would put bytes into a ring buffer and set .ready_rd.
56+
def do_input(self, t):
57+
self.ready_rd = True # Data is ready to read
58+
59+
# Write timer callback. Emulate hardware: if there's data in the buffer
60+
# write some or all of it
61+
def do_output(self, t):
62+
if self.wch:
63+
self.wbuf[self.widx] = self.wch
64+
self.widx += 1
65+
if self.wch == ord('\n'):
66+
self.wprint_len = self.widx # Save for schedule
67+
micropython.schedule(printbuf, self)
68+
self.widx = 0
69+
self.wch = b''
70+
71+
72+
def ioctl(self, req, arg): # see ports/stm32/uart.c
73+
ret = MP_STREAM_ERROR
74+
if req == MP_STREAM_POLL:
75+
ret = 0
76+
if arg & MP_STREAM_POLL_RD:
77+
if self.ready_rd:
78+
ret |= MP_STREAM_POLL_RD
79+
if arg & MP_STREAM_POLL_WR:
80+
if not self.wch:
81+
ret |= MP_STREAM_POLL_WR # Ready if no char pending
82+
return ret
83+
84+
# Test of device that produces one character at a time
85+
def readline(self):
86+
self.ready_rd = False # Cleared by timer cb do_input
87+
ch = self.rbuf[self.ridx]
88+
if ch == ord('\n'):
89+
self.ridx = 0
90+
else:
91+
self.ridx += 1
92+
return chr(ch)
93+
94+
# Emulate unbuffered hardware which writes one character: uasyncio waits
95+
# until hardware is ready for the next. Hardware ready is emulated by write
96+
# timer callback.
97+
def write(self, buf, off, sz):
98+
self.wch = buf[off] # Hardware starts to write a char
99+
return 1 # 1 byte written. uasyncio waits on ioctl write ready
100+
101+
# Note that trapping the exception and returning is still mandatory.
102+
async def receiver(myior):
103+
sreader = asyncio.StreamReader(myior)
104+
try:
105+
while True:
106+
res = await sreader.readline()
107+
print('Received', res)
108+
except asyncio.TimeoutError:
109+
print('Receiver timeout')
110+
111+
async def sender(myiow):
112+
swriter = asyncio.StreamWriter(myiow, {})
113+
await asyncio.sleep(1)
114+
count = 0
115+
try: # Trap in outermost scope to catch cancellation of .sleep
116+
while True:
117+
count += 1
118+
tosend = 'Wrote Hello MyIO {}\n'.format(count)
119+
await swriter.awrite(tosend.encode('UTF8'))
120+
await asyncio.sleep(2)
121+
except asyncio.TimeoutError:
122+
print('Sender timeout')
123+
124+
async def run(coro, t):
125+
await asyncio.wait_for_ms(coro, t)
126+
127+
async def do_test(loop, t):
128+
myio = MyIO()
129+
while True:
130+
tr = t * 1000 + (pyb.rng() >> 20) # Add ~1s uncertainty
131+
tw = t * 1000 + (pyb.rng() >> 20)
132+
print('Timeouts: {:7.3f}s read {:7.3f}s write'.format(tr/1000, tw/1000))
133+
loop.create_task(run(receiver(myio), tr))
134+
await run(sender(myio), tw)
135+
await asyncio.sleep(2) # Wait out timing randomness
136+
137+
def test(ioq=False):
138+
if ioq:
139+
loop = asyncio.get_event_loop(ioq_len=16)
140+
else:
141+
loop = asyncio.get_event_loop()
142+
loop.create_task(do_test(loop, 15))
143+
loop.run_forever()

0 commit comments

Comments
 (0)