Skip to content

Commit 58d5a16

Browse files
committed
Code improvements to ThreadSafeQueue. Fix docs and code comments.
1 parent 500906b commit 58d5a16

File tree

2 files changed

+18
-25
lines changed

2 files changed

+18
-25
lines changed

v3/docs/EVENTS.md

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -536,9 +536,9 @@ except IndexError:
536536

537537
# 8. Threadsafe Queue
538538

539-
This queue is designed to interface between one or more `uasyncio` tasks and a
540-
single thread running in a different context. This can be an interrupt service
541-
routine (ISR) or code running in a different thread or on a different core.
539+
This queue is designed to interface between one `uasyncio` task and a single
540+
thread running in a different context. This can be an interrupt service routine
541+
(ISR), code running in a different thread or code on a different core.
542542

543543
Any Python object may be placed on a `ThreadSafeQueue`. If bi-directional
544544
communication is required between the two contexts, two `ThreadSafeQueue`
@@ -556,7 +556,7 @@ Constructor mandatory arg:
556556
* `buf` Buffer for the queue, e.g. list `[0 for _ in range(20)]` or array. A
557557
buffer of size `N` can hold a maximum of `N-1` items.
558558

559-
Synchronous methods (immediate return):
559+
Synchronous methods.
560560
* `qsize` No arg. Returns the number of items in the queue.
561561
* `empty` No arg. Returns `True` if the queue is empty.
562562
* `full` No arg. Returns `True` if the queue is full.
@@ -567,6 +567,9 @@ Synchronous methods (immediate return):
567567
`IndexError` if the queue is full unless `block==True` in which case the
568568
method blocks until the `uasyncio` tasks remove an item from the queue.
569569

570+
The blocking methods should not be used in the `uasyncio` context, because by
571+
blocking they will lock up the scheduler.
572+
570573
Asynchronous methods:
571574
* `put` Arg: the object to put on the queue. If the queue is full, it will
572575
block until space is available.
@@ -579,12 +582,8 @@ Data consumer:
579582
```python
580583
async def handle_queued_data(q):
581584
async for obj in q:
582-
await asyncio.sleep(0) # See below
583585
# Process obj
584586
```
585-
The `sleep` is necessary if you have multiple tasks waiting on the queue,
586-
otherwise one task hogs all the data.
587-
588587
Data provider:
589588
```python
590589
async def feed_queue(q):

v3/primitives/threadsafe_queue.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
# Uses pre-allocated ring buffer: can use list or array
77
# Asynchronous iterator allowing consumer to use async for
8-
# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded.
98

109
import uasyncio as asyncio
1110

@@ -29,41 +28,36 @@ def qsize(self):
2928
return (self._wi - self._ri) % self._size
3029

3130
def get_sync(self, block=False): # Remove and return an item from the queue.
32-
# Return an item if one is immediately available, else raise QueueEmpty.
33-
if block:
34-
while self.empty():
35-
pass
36-
else:
37-
if self.empty():
38-
raise IndexError
31+
if not block and self.empty():
32+
raise IndexError # Not allowed to block
33+
while self.empty(): # Block until an item appears
34+
pass
3935
r = self._q[self._ri]
4036
self._ri = (self._ri + 1) % self._size
4137
self._evget.set()
4238
return r
4339

4440
def put_sync(self, v, block=False):
4541
self._q[self._wi] = v
46-
self._evput.set() # Schedule any tasks waiting on get
47-
if block:
48-
while ((self._wi + 1) % self._size) == self._ri:
49-
pass # can't bump ._wi until an item is removed
50-
elif ((self._wi + 1) % self._size) == self._ri:
42+
self._evput.set() # Schedule task waiting on get
43+
if not block and self.full():
5144
raise IndexError
45+
while self.full():
46+
pass # can't bump ._wi until an item is removed
5247
self._wi = (self._wi + 1) % self._size
5348

5449
async def put(self, val): # Usage: await queue.put(item)
5550
while self.full(): # Queue full
56-
await self._evget.wait() # May be >1 task waiting on ._evget
57-
# Task(s) waiting to get from queue, schedule first Task
51+
await self._evget.wait()
5852
self.put_sync(val)
5953

6054
def __aiter__(self):
6155
return self
6256

6357
async def __anext__(self):
64-
while self.empty(): # Empty. May be more than one task waiting on ._evput
58+
while self.empty():
6559
await self._evput.wait()
6660
r = self._q[self._ri]
6761
self._ri = (self._ri + 1) % self._size
68-
self._evget.set() # Schedule all tasks waiting on ._evget
62+
self._evget.set() # Schedule task waiting on ._evget
6963
return r

0 commit comments

Comments
 (0)