Skip to content

Commit 64ef6e5

Browse files
committed
Rewrite Message class.
1 parent c2f9259 commit 64ef6e5

File tree

3 files changed

+185
-67
lines changed

3 files changed

+185
-67
lines changed

v3/docs/TUTORIAL.md

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,15 +1008,14 @@ asyncio.run(queue_go(4))
10081008

10091009
## 3.6 ThreadSafeFlag
10101010

1011-
This requires firmware V1.15 or later.
10121011
See also [Interfacing uasyncio to interrupts](./INTERRUPTS.md). Because of
10131012
[this issue](https://github.com/micropython/micropython/issues/7965) the
10141013
`ThreadSafeFlag` class does not work under the Unix build.
10151014

10161015
This official class provides an efficient means of synchronising a task with a
10171016
truly asynchronous event such as a hardware interrupt service routine or code
1018-
running in another thread. It operates in a similar way to `Event` with the
1019-
following key differences:
1017+
running in another thread or on another core. It operates in a similar way to
1018+
`Event` with the following key differences:
10201019
* It is thread safe: the `set` event may be called from asynchronous code.
10211020
* It is self-clearing.
10221021
* Only one task may wait on the flag.
@@ -1094,26 +1093,39 @@ hardware device requires the use of an ISR for a μs level response. Having
10941093
serviced the device, the ISR flags an asynchronous routine, typically
10951094
processing received data.
10961095

1097-
The fact that only one task may wait on a `ThreadSafeFlag` may be addressed by
1098-
having a task that waits on the `ThreadSafeFlag` set an `Event` as in the
1099-
following:
1096+
The fact that only one task may wait on a `ThreadSafeFlag` may be addressed as
1097+
follows.
11001098
```python
11011099
class ThreadSafeEvent(asyncio.Event):
11021100
def __init__(self):
11031101
super().__init__()
1102+
self._waiting_on_tsf = False
11041103
self._tsf = asyncio.ThreadSafeFlag()
1105-
asyncio.create_task(self._run())
1106-
1107-
async def _run(self):
1108-
while True:
1109-
await self._tsf.wait()
1110-
super().set()
11111104

11121105
def set(self):
11131106
self._tsf.set()
1107+
1108+
async def _waiter(self): # Runs if 1st task is cancelled
1109+
await self._tsf.wait()
1110+
super().set()
1111+
self._waiting_on_tsf = False
1112+
1113+
async def wait(self):
1114+
if self._waiting_on_tsf == False:
1115+
self._waiting_on_tsf = True
1116+
await asyncio.sleep(0) # Ensure other tasks see updated flag
1117+
try:
1118+
await self._tsf.wait()
1119+
super().set()
1120+
self._waiting_on_tsf = False
1121+
except asyncio.CancelledError:
1122+
asyncio.create_task(self._waiter())
1123+
raise # Pass cancellation to calling code
1124+
else:
1125+
await super().wait()
11141126
```
1115-
An instance may be set by a hard ISR or from another thread/core. It must
1116-
explicitly be cleared. Multiple tasks may wait on it.
1127+
An instance may be set by a hard ISR or from another thread/core. As an `Event`
1128+
it can support multiple tasks and must explicitly be cleared.
11171129

11181130
###### [Contents](./TUTORIAL.md#contents)
11191131

@@ -1331,9 +1343,8 @@ finally:
13311343

13321344
## 3.9 Message
13331345

1334-
This requires firmware V1.15 or later. Note that because of
1335-
[this issue](https://github.com/micropython/micropython/issues/7965) the
1336-
`Message` class does not work under the Unix build.
1346+
Because of [this issue](https://github.com/micropython/micropython/issues/7965)
1347+
the `Message` class does not work under the Unix build.
13371348

13381349
This is an unofficial primitive with no counterpart in CPython asyncio. It uses
13391350
[ThreadSafeFlag](./TUTORIAL.md#36-threadsafeflag) to provide an object similar
@@ -1345,6 +1356,7 @@ It is similar to the `Event` class. It differs in that:
13451356
* `.set()` is capable of being called from a hard or soft interrupt service
13461357
routine.
13471358
* It is an awaitable class.
1359+
* It can be used in an asynchronous iterator.
13481360
* The logic of `.clear` differs: it must be called by at least one task which
13491361
waits on the `Message`.
13501362

@@ -1421,6 +1433,16 @@ async def main():
14211433

14221434
asyncio.run(main())
14231435
```
1436+
Receiving messages in an asynchronous iterator:
1437+
```python
1438+
msg = Message()
1439+
asyncio.create_task(send_data(msg))
1440+
async for data in msg:
1441+
# process data
1442+
msg.clear()
1443+
```
1444+
The `Message` class does not have a queue: if the instance is set, then set
1445+
again before it is accessed, the first data item will be lost.
14241446

14251447
## 3.10 Synchronising to hardware
14261448

v3/primitives/message.py

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,73 @@
11
# message.py
22
# Now uses ThreadSafeFlag for efficiency
33

4-
# Copyright (c) 2018-2021 Peter Hinch
4+
# Copyright (c) 2018-2022 Peter Hinch
55
# Released under the MIT License (MIT) - see LICENSE file
66

77
# Usage:
88
# from primitives.message import Message
9-
10-
try:
9+
# See https://github.com/micropython/micropython/issues/7965 for code below
10+
import sys
11+
ok = hasattr(sys.implementation, "_machine") # MicroPython
12+
if ok:
13+
ok = "linux" not in sys.implementation._machine
14+
if ok:
1115
import uasyncio as asyncio
12-
except ImportError:
13-
import asyncio
16+
else:
17+
print("Message is MicroPython only, and not on Unix build.")
18+
sys.exit(1)
1419

1520
# A coro waiting on a message issues await message
1621
# A coro or hard/soft ISR raising the message issues.set(payload)
1722
# .clear() should be issued by at least one waiting task and before
1823
# next event.
1924

20-
class Message(asyncio.ThreadSafeFlag):
21-
def __init__(self, _=0): # Arg: poll interval. Compatibility with old code.
22-
self._evt = asyncio.Event()
23-
self._data = None # Message
24-
self._state = False # Ensure only one task waits on ThreadSafeFlag
25-
self._is_set = False # For .is_set()
25+
class Message(asyncio.Event):
26+
def __init__(self):
2627
super().__init__()
28+
self._waiting_on_tsf = False
29+
self._tsf = asyncio.ThreadSafeFlag()
30+
self._data = None # Message
31+
self._is_set = False
2732

2833
def clear(self): # At least one task must call clear when scheduled
29-
self._state = False
3034
self._is_set = False
35+
super().clear()
3136

3237
def __iter__(self):
3338
yield from self.wait()
3439
return self._data
3540

41+
async def _waiter(self): # Runs if 1st task is cancelled
42+
await self._tsf.wait()
43+
super().set()
44+
self._waiting_on_tsf = False
45+
3646
async def wait(self):
37-
if self._state: # A task waits on ThreadSafeFlag
38-
await self._evt.wait() # Wait on event
39-
else: # First task to wait
40-
self._state = True
41-
# Ensure other tasks see updated ._state before they wait
42-
await asyncio.sleep_ms(0)
43-
await super().wait() # Wait on ThreadSafeFlag
44-
self._evt.set()
45-
self._evt.clear()
47+
if self._waiting_on_tsf == False:
48+
self._waiting_on_tsf = True
49+
await asyncio.sleep(0) # Ensure other tasks see updated flag
50+
try:
51+
await self._tsf.wait()
52+
super().set()
53+
self._waiting_on_tsf = False
54+
except asyncio.CancelledError:
55+
asyncio.create_task(self._waiter())
56+
raise # Pass cancellation to calling code
57+
else:
58+
await super().wait()
4659
return self._data
4760

4861
def set(self, data=None): # Can be called from a hard ISR
4962
self._data = data
5063
self._is_set = True
51-
super().set()
64+
self._tsf.set()
65+
66+
def __aiter__(self):
67+
return self
68+
69+
async def __anext__(self):
70+
return await self
5271

5372
def is_set(self):
5473
return self._is_set

0 commit comments

Comments
 (0)