|
| 1 | +# iotest4.py Test PR #3836. Demonstrate the anomaly with a read/write device. |
| 2 | +# User class write() performs unbuffered writing. |
| 3 | +# For simplicity this uses buffered read: unbuffered is tested by iotest2.py. |
| 4 | + |
| 5 | +# Run iotest4.test() to see expected output |
| 6 | +# iotest4.test(False) to demonstrate the issue. |
| 7 | + |
| 8 | +# Pass/Fail is determined by whether the StreamReader and StreamWriter operate |
| 9 | +# on the same (fail) or different (pass) objects. |
| 10 | +# I suspect that the issue is with select/ipoll (uasyncio __init__.py) |
| 11 | +# The fault is either in select/poll or uasyncio __init__.py. |
| 12 | +# As soon as PollEventLoop.add_writer() is called, reading stops. |
| 13 | +# PollEventLoop.add_writer() is called when StreamWriter.awrite() issues |
| 14 | +# yield IOWrite(self.s), which for unbuffered devices is after the 1st char |
| 15 | +# of a multi-char buf is written. |
| 16 | + |
| 17 | +import io, pyb |
| 18 | +import uasyncio as asyncio |
| 19 | +import micropython |
| 20 | +micropython.alloc_emergency_exception_buf(100) |
| 21 | + |
| 22 | +MP_STREAM_POLL_RD = const(1) |
| 23 | +MP_STREAM_POLL_WR = const(4) |
| 24 | +MP_STREAM_POLL = const(3) |
| 25 | +MP_STREAM_ERROR = const(-1) |
| 26 | + |
| 27 | +def printbuf(this_io): |
| 28 | + print(this_io.wbuf[:this_io.wprint_len]) |
| 29 | + |
| 30 | +class MyIO(io.IOBase): |
| 31 | + def __init__(self, read=False, write=False): |
| 32 | + if read: |
| 33 | + self.ready_rd = False |
| 34 | + self.rbuf = b'ready\n' # Read buffer |
| 35 | + pyb.Timer(4, freq = 1, callback = self.do_input) |
| 36 | + if write: |
| 37 | + self.wbuf = bytearray(100) # Write buffer |
| 38 | + self.wprint_len = 0 |
| 39 | + self.widx = 0 |
| 40 | + self.wch = b'' |
| 41 | + pyb.Timer(5, freq = 10, callback = self.do_output) |
| 42 | + |
| 43 | + # Read callback: emulate asynchronous input from hardware. |
| 44 | + # Typically would put bytes into a ring buffer and set .ready_rd. |
| 45 | + def do_input(self, t): |
| 46 | + self.ready_rd = True # Data is ready to read |
| 47 | + |
| 48 | + # Write timer callback. Emulate hardware: if there's data in the buffer |
| 49 | + # write some or all of it |
| 50 | + def do_output(self, t): |
| 51 | + if self.wch: |
| 52 | + self.wbuf[self.widx] = self.wch |
| 53 | + self.widx += 1 |
| 54 | + if self.wch == ord('\n'): |
| 55 | + self.wprint_len = self.widx # Save for schedule |
| 56 | + micropython.schedule(printbuf, self) |
| 57 | + self.widx = 0 |
| 58 | + self.wch = b'' |
| 59 | + |
| 60 | + |
| 61 | + def ioctl(self, req, arg): # see ports/stm32/uart.c |
| 62 | + ret = MP_STREAM_ERROR |
| 63 | + if req == MP_STREAM_POLL: |
| 64 | + ret = 0 |
| 65 | + if arg & MP_STREAM_POLL_RD: |
| 66 | + if self.ready_rd: |
| 67 | + ret |= MP_STREAM_POLL_RD |
| 68 | + if arg & MP_STREAM_POLL_WR: |
| 69 | + if not self.wch: |
| 70 | + ret |= MP_STREAM_POLL_WR # Ready if no char pending |
| 71 | + return ret |
| 72 | + |
| 73 | + # Emulate a device with buffered read. Return the buffer, falsify read ready |
| 74 | + # Read timer sets ready. |
| 75 | + def readline(self): |
| 76 | + self.ready_rd = False |
| 77 | + return self.rbuf |
| 78 | + |
| 79 | + # Emulate unbuffered hardware which writes one character: uasyncio waits |
| 80 | + # until hardware is ready for the next. Hardware ready is emulated by write |
| 81 | + # timer callback. |
| 82 | + def write(self, buf, off, sz): |
| 83 | + self.wch = buf[off] # Hardware starts to write a char |
| 84 | + return 1 # 1 byte written. uasyncio waits on ioctl write ready |
| 85 | + |
| 86 | +async def receiver(myior): |
| 87 | + sreader = asyncio.StreamReader(myior) |
| 88 | + while True: |
| 89 | + res = await sreader.readline() |
| 90 | + print('Received', res) |
| 91 | + |
| 92 | +async def sender(myiow): |
| 93 | + swriter = asyncio.StreamWriter(myiow, {}) |
| 94 | + await asyncio.sleep(5) |
| 95 | + count = 0 |
| 96 | + while True: |
| 97 | + count += 1 |
| 98 | + tosend = 'Wrote Hello MyIO {}\n'.format(count) |
| 99 | + await swriter.awrite(tosend.encode('UTF8')) |
| 100 | + await asyncio.sleep(2) |
| 101 | + |
| 102 | +def test(good=True): |
| 103 | + if good: |
| 104 | + myior = MyIO(read=True) |
| 105 | + myiow = MyIO(write=True) |
| 106 | + else: |
| 107 | + myior = MyIO(read=True, write=True) |
| 108 | + myiow = myior |
| 109 | + loop = asyncio.get_event_loop() |
| 110 | + loop.create_task(receiver(myior)) |
| 111 | + loop.create_task(sender(myiow)) |
| 112 | + loop.run_forever() |
0 commit comments