Skip to content

Commit 732c888

Browse files
committed
Threadsafe queue has asynchronous get method.
1 parent 4549dc4 commit 732c888

File tree

2 files changed

+93
-14
lines changed

2 files changed

+93
-14
lines changed

v3/docs/THREADING.md

Lines changed: 90 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
# Thread safe classes
1+
# Linking uasyncio and other contexts
22

3-
These provide an interface between `uasyncio` tasks and code running in a
4-
different context. Supported contexts are:
3+
# 1. Introduction
4+
5+
This document identifies issues arising when `uasyncio` applications interface
6+
code running in a different context. Supported contexts are:
57
1. An interrupt service routine (ISR).
68
2. Another thread running on the same core.
79
3. Code running on a different core (currently only supported on RP2).
@@ -12,8 +14,50 @@ MicroPython (MP) ISR will not interrupt execution of a line of Python code.
1214

1315
This is not the case where the threads run on different cores, where there is
1416
no synchronisation between the streams of machine code. If the two threads
15-
concurrently modify a shared Python object, there is no guarantee that
16-
corruption will not occur.
17+
concurrently modify a shared Python object it is possible that corruption will
18+
occur. Reading an object while it is being written can also produce an
19+
unpredictable outcome.
20+
21+
A key practical point is that coding errors can be hard to identify: the
22+
consequences can be extremely rare bugs or crashes.
23+
24+
There are two fundamental problems: data sharing and synchronisation.
25+
26+
# 2. Data sharing
27+
28+
The simplest case is a shared pool of data. It is possible to share an `int` or
29+
`bool` because at machine code level writing an `int` is "atomic": it cannot be
30+
interrupted. Anything more complex must be protected to ensure that concurrent
31+
access cannot take place. The consequences even of reading an object while it
32+
is being written can be unpredictable. One approach is to use locking:
33+
34+
```python
35+
lock = _thread.allocate_lock()
36+
values = { "X": 0, "Y": 0, "Z": 0}
37+
def producer():
38+
while True:
39+
lock.acquire()
40+
values["X"] = sensor_read(0)
41+
values["Y"] = sensor_read(1)
42+
values["Z"] = sensor_read(2)
43+
lock.release()
44+
time.sleep_ms(100)
45+
46+
_thread.start_new_thread(producer, ())
47+
48+
async def consumer():
49+
while True:
50+
lock.acquire()
51+
await process(values) # Do something with the data
52+
lock.release()
53+
```
54+
This will work even for the multi core case. However the consumer might hold
55+
the lock for some time: it will take time for the scheduler to execute the
56+
`process()` call, and the call itself will take time to run. This would be
57+
problematic if the producer were an ISR.
58+
59+
In cases such as this a `ThreadSafeQueue` is more appropriate as it decouples
60+
producer and consumer code.
1761

1862
# 2. Threadsafe Event
1963

@@ -65,8 +109,9 @@ communication is required between the two contexts, two `ThreadSafeQueue`
65109
instances are required.
66110

67111
Attributes of `ThreadSafeQueue`:
68-
1. It is of fixed size defined on instantiation.
69-
2. It uses pre-allocated buffers of various types (`Queue` uses a `list`).
112+
1. It is of fixed capacity defined on instantiation.
113+
2. It uses a pre-allocated buffer of user selectable type (`Queue` uses a
114+
dynaically allocated `list`).
70115
3. It is an asynchronous iterator allowing retrieval with `async for`.
71116
4. It provides synchronous "put" and "get" methods. If the queue becomes full
72117
(put) or empty (get), behaviour is user definable. The method either blocks or
@@ -92,6 +137,10 @@ See the note below re blocking methods.
92137
Asynchronous methods:
93138
* `put` Arg: the object to put on the queue. If the queue is full, it will
94139
block until space is available.
140+
* `get` No arg. Returns an object from the queue. If the queue is empty, it
141+
will block until an object is put on the queue. Normal retrieval is with
142+
`async for` but this method provides an alternative.
143+
95144

96145
In use as a data consumer the `uasyncio` code will use `async for` to retrieve
97146
items from the queue. If it is a data provider it will use `put` to place
@@ -130,17 +179,44 @@ while True:
130179

131180
## 3.1 Blocking
132181

133-
The synchronous `get_sync` and `put_sync` methods have blocking modes invoked
134-
by passing `block=True`. Blocking modes are intended to be used in a multi
135-
threaded context. They should not be invoked in a `uasyncio` task, because
136-
blocking locks up the scheduler. Nor should they be used in an ISR where
137-
blocking code can have unpredictable consequences.
138-
139182
These methods, called with `blocking=False`, produce an immediate return. To
140183
avoid an `IndexError` the user should check for full or empty status before
141184
calling.
142185

143-
## 3.2 A complete example
186+
The synchronous `get_sync` and `put_sync` methods have blocking modes invoked
187+
by passing `block=True`. Blocking modes are primarily intended for use in the
188+
non-`uasyncio ` context. If invoked in a `uasyncio` task they must not be
189+
allowed to block because it would lock up the scheduler. Nor should they be
190+
allowed to block in an ISR where blocking can have unpredictable consequences.
191+
192+
## 3.2 Object ownership
193+
194+
Any Python object can be placed on a queue, but the user should be aware that
195+
once the producer puts an object on the queue it loses ownership of the object
196+
until the consumer has finished using it. In this sample the producer reads X,
197+
Y and Z values from a sensor, puts them in a list or array and places the
198+
object on a queue:
199+
```python
200+
def get_coordinates(q):
201+
while True:
202+
lst = [axis(0), axis(1), axis(2)] # Read sensors and put into list
203+
putq.put_sync(lst, block=True)
204+
```
205+
This is valid because a new list is created each time. The following will not
206+
work:
207+
```python
208+
def get_coordinates(q):
209+
a = array.array("I", (0,0,0))
210+
while True:
211+
a[0], a[1], a[2] = [axis(0), axis(1), axis(2)]
212+
putq.put_sync(lst, block=True)
213+
```
214+
The problem here is that the array is modified after being put on the queue. If
215+
the queue is capable of holding 10 objects, 10 array instances are required. Re
216+
using objects requires the producer to be notified that the consumer has
217+
finished with the item.
218+
219+
## 3.3 A complete example
144220

145221
This demonstrates an echo server running on core 2. The `sender` task sends
146222
consecutive integers to the server, which echoes them back on a second queue.

v3/threadsafe/threadsafe_queue.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ def __aiter__(self):
5555
return self
5656

5757
async def __anext__(self):
58+
return await self.get()
59+
60+
async def get(self):
5861
while self.empty():
5962
await self._evput.wait()
6063
r = self._q[self._ri]

0 commit comments

Comments
 (0)