Skip to content

Commit 69f4d11

Browse files
authored
Merge pull request peterhinch#109 from stephanelsmith/master
Queue: Add task_done/join
2 parents 6e8d725 + 4554aff commit 69f4d11

File tree

3 files changed

+91
-0
lines changed

3 files changed

+91
-0
lines changed

v3/docs/TUTORIAL.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -973,12 +973,15 @@ Synchronous methods (immediate return):
973973
queue is full.
974974
* `get_nowait` No arg. Returns an object from the queue. Raises an exception
975975
if the queue is empty.
976+
* `task_done` No arg. Indicate that a task associated with a dequeued item is complete.
976977

977978
Asynchronous methods:
978979
* `put` Arg: the object to put on the queue. If the queue is full, it will
979980
block until space is available.
980981
* `get` No arg. Returns an object from the queue. If the queue is empty, it
981982
will block until an object is put on the queue.
983+
* `join` No arg. Block until all items in the queue have been received and
984+
processed (indicated via task_done).
982985

983986
```python
984987
import uasyncio as asyncio

v3/primitives/queue.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ def __init__(self, maxsize=0):
2626
self._evput = asyncio.Event() # Triggered by put, tested by get
2727
self._evget = asyncio.Event() # Triggered by get, tested by put
2828

29+
self._jncnt = 0
30+
self._jnevt = asyncio.Event()
31+
self._upd_jnevt(0) #update join event
32+
2933
def _get(self):
3034
self._evget.set() # Schedule all tasks waiting on get
3135
self._evget.clear()
@@ -45,6 +49,7 @@ def get_nowait(self): # Remove and return an item from the queue.
4549
return self._get()
4650

4751
def _put(self, val):
52+
self._upd_jnevt(1) # update join event
4853
self._evput.set() # Schedule tasks waiting on put
4954
self._evput.clear()
5055
self._queue.append(val)
@@ -71,3 +76,19 @@ def full(self): # Return True if there are maxsize items in the queue.
7176
# Note: if the Queue was initialized with maxsize=0 (the default) or
7277
# any negative number, then full() is never True.
7378
return self.maxsize > 0 and self.qsize() >= self.maxsize
79+
80+
81+
def _upd_jnevt(self, inc:int): # #Update join count and join event
82+
self._jncnt += inc
83+
if self._jncnt <= 0:
84+
self._jnevt.set()
85+
else:
86+
self._jnevt.clear()
87+
88+
def task_done(self): # Task Done decrements counter
89+
self._upd_jnevt(-1)
90+
91+
async def join(self): # Wait for join event
92+
await self._jnevt.wait()
93+
94+

v3/primitives/tests/asyntest.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def print_tests():
3434
test(7) Test the Condition class.
3535
test(8) Test the Queue class.
3636
test(9) Test the RingbufQueue class.
37+
test(10) Test the Queue task_done/join behavior.
3738
'''
3839
print('\x1b[32m')
3940
print(st)
@@ -636,6 +637,70 @@ def rbq_test():
636637
''', 6)
637638
asyncio.run(rbq_go())
638639

640+
641+
# ************ Queue task_done/join test ************
642+
async def q_task_done_join_consumer(q):
643+
while True:
644+
r = await q.get()
645+
print('consumer', 'got/processing {}'.format(r))
646+
await asyncio.sleep(.5)
647+
q.task_done()
648+
async def q_task_done_join_waiter(q):
649+
print('waiter','await q.join')
650+
await q.join()
651+
print('waiter','joined!', 'task done!')
652+
async def q_task_done_join_go():
653+
q = Queue()
654+
655+
#empty queue should not block join
656+
print('test', 'await empty q.join')
657+
await q.join()
658+
print('test', 'pass')
659+
660+
consumer_task = asyncio.create_task(q_task_done_join_consumer(q))
661+
waiter_task = asyncio.create_task(q_task_done_join_waiter(q))
662+
663+
#add jobs
664+
for x in range(10):
665+
await q.put(x)
666+
667+
print('test','await q.join')
668+
await q.join()
669+
print('test','all jobs done!')
670+
671+
await asyncio.sleep(0)
672+
print('test','waiter_task.done()?', waiter_task.done())
673+
674+
consumer_task.cancel()
675+
await asyncio.gather(consumer_task, return_exceptions=True)
676+
677+
print('test','DONE')
678+
679+
680+
def q_task_done_join_test():
681+
printexp('''Test Queue task_done/join behaviors
682+
test await empty q.join
683+
test pass
684+
test await q.join
685+
consumer got/processing 0
686+
waiter await q.join
687+
consumer got/processing 1
688+
consumer got/processing 2
689+
consumer got/processing 3
690+
consumer got/processing 4
691+
consumer got/processing 5
692+
consumer got/processing 6
693+
consumer got/processing 7
694+
consumer got/processing 8
695+
consumer got/processing 9
696+
test all jobs done!
697+
waiter joined! task done!
698+
test waiter_task.done()? True
699+
test DONE
700+
''', 5)
701+
asyncio.run(q_task_done_join_go())
702+
703+
639704
# ************ ************
640705
def test(n):
641706
try:
@@ -657,6 +722,8 @@ def test(n):
657722
queue_test() # Test the Queue class.
658723
elif n == 9:
659724
rbq_test() # Test the RingbufQueue class.
725+
elif n == 10:
726+
q_task_done_join_test() # Test the Queue task_done/join behavior.
660727
except KeyboardInterrupt:
661728
print('Interrupted')
662729
finally:

0 commit comments

Comments
 (0)