Skip to content

Commit 500906b

Browse files
committed
Fix RingbufQueue bug. Add ThreadSafeQueue.
1 parent 64ef6e5 commit 500906b

File tree

4 files changed

+155
-0
lines changed

4 files changed

+155
-0
lines changed

v3/docs/EVENTS.md

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,85 @@ except IndexError:
534534
pass
535535
```
536536

537+
# 8. Threadsafe Queue
538+
539+
This queue is designed to interface between one or more `uasyncio` tasks and a
540+
single thread running in a different context. This can be an interrupt service
541+
routine (ISR) or code running in a different thread or on a different core.
542+
543+
Any Python object may be placed on a `ThreadSafeQueue`. If bi-directional
544+
communication is required between the two contexts, two `ThreadSafeQueue`
545+
instances are required.
546+
547+
Attributes of `ThreadSafeQueue`:
548+
1. It is of fixed size defined on instantiation.
549+
2. It uses pre-allocated buffers of various types (`Queue` uses a `list`).
550+
3. It is an asynchronous iterator allowing retrieval with `async for`.
551+
4. It provides synchronous "put" and "get" methods. If the queue becomes full
552+
(put) or empty (get), behaviour is user definable. The method either blocks or
553+
raises an `IndexError`.
554+
555+
Constructor mandatory arg:
556+
* `buf` Buffer for the queue, e.g. list `[0 for _ in range(20)]` or array. A
557+
buffer of size `N` can hold a maximum of `N-1` items.
558+
559+
Synchronous methods (immediate return):
560+
* `qsize` No arg. Returns the number of items in the queue.
561+
* `empty` No arg. Returns `True` if the queue is empty.
562+
* `full` No arg. Returns `True` if the queue is full.
563+
* `get_sync` Arg `block=False`. Returns an object from the queue. Raises
564+
`IndexError` if the queue is empty, unless `block==True` in which case the
565+
method blocks until the `uasyncio` tasks put an item on the queue.
566+
* `put_sync` Args: the object to put on the queue, `block=False`. Raises
567+
`IndexError` if the queue is full unless `block==True` in which case the
568+
method blocks until the `uasyncio` tasks remove an item from the queue.
569+
570+
Asynchronous methods:
571+
* `put` Arg: the object to put on the queue. If the queue is full, it will
572+
block until space is available.
573+
574+
In use as a data consumer the `uasyncio` code will use `async for` to retrieve
575+
items from the queue. If it is a data provider it will use `put` to place
576+
objects on the queue.
577+
578+
Data consumer:
579+
```python
580+
async def handle_queued_data(q):
581+
async for obj in q:
582+
await asyncio.sleep(0) # See below
583+
# Process obj
584+
```
585+
The `sleep` is necessary if you have multiple tasks waiting on the queue,
586+
otherwise one task hogs all the data.
587+
588+
Data provider:
589+
```python
590+
async def feed_queue(q):
591+
while True:
592+
data = await data_source()
593+
await q.put(data)
594+
```
595+
The alternate thread will use synchronous methods.
596+
597+
Data provider (throw if full):
598+
```python
599+
while True:
600+
data = data_source()
601+
try:
602+
q.put_sync(data)
603+
except IndexError:
604+
# Queue is full
605+
```
606+
Data consumer (block while empty):
607+
```python
608+
while True:
609+
data = q.get(block=True) # May take a while if the uasyncio side is slow
610+
process(data) # Do something with it
611+
```
612+
Note that where the alternate thread is an ISR it is very bad practice to allow
613+
blocking. The application should be designed in such a way that the full/empty
614+
case does not occur.
615+
537616
###### [Contents](./EVENTS.md#0-contents)
538617

539618
# 100 Appendix 1 Polling

v3/primitives/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def _handle_exception(loop, context):
4848
"ESwitch": "events",
4949
"EButton": "events",
5050
"RingbufQueue": "ringbuf_queue",
51+
"ThreadSafeQueue": "threadsafe_queue",
5152
}
5253

5354
# Copied from uasyncio.__init__.py

v3/primitives/ringbuf_queue.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
# ringbuf_queue.py Provides RingbufQueue class
2+
3+
# Copyright (c) 2022 Peter Hinch
4+
# Released under the MIT License (MIT) - see LICENSE file
5+
26
# API differs from CPython
37
# Uses pre-allocated ring buffer: can use list or array
48
# Asynchronous iterator allowing consumer to use async for
@@ -31,6 +35,8 @@ def get_nowait(self): # Remove and return an item from the queue.
3135
raise IndexError
3236
r = self._q[self._ri]
3337
self._ri = (self._ri + 1) % self._size
38+
self._evget.set() # Schedule all tasks waiting on ._evget
39+
self._evget.clear()
3440
return r
3541

3642
def put_nowait(self, v):

v3/primitives/threadsafe_queue.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# threadsafe_queue.py Provides ThreadsafeQueue class
2+
3+
# Copyright (c) 2022 Peter Hinch
4+
# Released under the MIT License (MIT) - see LICENSE file
5+
6+
# Uses pre-allocated ring buffer: can use list or array
7+
# Asynchronous iterator allowing consumer to use async for
8+
# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded.
9+
10+
import uasyncio as asyncio
11+
12+
13+
class ThreadSafeQueue: # MicroPython optimised
14+
def __init__(self, buf):
15+
self._q = buf
16+
self._size = len(buf)
17+
self._wi = 0
18+
self._ri = 0
19+
self._evput = asyncio.ThreadSafeFlag() # Triggered by put, tested by get
20+
self._evget = asyncio.ThreadSafeFlag() # Triggered by get, tested by put
21+
22+
def full(self):
23+
return ((self._wi + 1) % self._size) == self._ri
24+
25+
def empty(self):
26+
return self._ri == self._wi
27+
28+
def qsize(self):
29+
return (self._wi - self._ri) % self._size
30+
31+
def get_sync(self, block=False): # Remove and return an item from the queue.
32+
# Return an item if one is immediately available, else raise QueueEmpty.
33+
if block:
34+
while self.empty():
35+
pass
36+
else:
37+
if self.empty():
38+
raise IndexError
39+
r = self._q[self._ri]
40+
self._ri = (self._ri + 1) % self._size
41+
self._evget.set()
42+
return r
43+
44+
def put_sync(self, v, block=False):
45+
self._q[self._wi] = v
46+
self._evput.set() # Schedule any tasks waiting on get
47+
if block:
48+
while ((self._wi + 1) % self._size) == self._ri:
49+
pass # can't bump ._wi until an item is removed
50+
elif ((self._wi + 1) % self._size) == self._ri:
51+
raise IndexError
52+
self._wi = (self._wi + 1) % self._size
53+
54+
async def put(self, val): # Usage: await queue.put(item)
55+
while self.full(): # Queue full
56+
await self._evget.wait() # May be >1 task waiting on ._evget
57+
# Task(s) waiting to get from queue, schedule first Task
58+
self.put_sync(val)
59+
60+
def __aiter__(self):
61+
return self
62+
63+
async def __anext__(self):
64+
while self.empty(): # Empty. May be more than one task waiting on ._evput
65+
await self._evput.wait()
66+
r = self._q[self._ri]
67+
self._ri = (self._ri + 1) % self._size
68+
self._evget.set() # Schedule all tasks waiting on ._evget
69+
return r

0 commit comments

Comments
 (0)