Skip to content

Commit 986b5c3

Browse files
queue: added task_done/join behavior and test (asynctest)
1 parent 6e8d725 commit 986b5c3

File tree

2 files changed

+81
-0
lines changed

2 files changed

+81
-0
lines changed

v3/primitives/queue.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ def __init__(self, maxsize=0):
2626
self._evput = asyncio.Event() # Triggered by put, tested by get
2727
self._evget = asyncio.Event() # Triggered by get, tested by put
2828

29+
self._jncnt = 0
30+
self._jnevt = asyncio.Event()
31+
2932
def _get(self):
3033
self._evget.set() # Schedule all tasks waiting on get
3134
self._evget.clear()
@@ -45,6 +48,7 @@ def get_nowait(self): # Remove and return an item from the queue.
4548
return self._get()
4649

4750
def _put(self, val):
51+
self._jncnt += 1
4852
self._evput.set() # Schedule tasks waiting on put
4953
self._evput.clear()
5054
self._queue.append(val)
@@ -71,3 +75,15 @@ def full(self): # Return True if there are maxsize items in the queue.
7175
# Note: if the Queue was initialized with maxsize=0 (the default) or
7276
# any negative number, then full() is never True.
7377
return self.maxsize > 0 and self.qsize() >= self.maxsize
78+
79+
def task_done(self):
80+
self._jncnt -= 1
81+
if self._jncnt <= 0:
82+
self._jnevt.set()
83+
else:
84+
self._jnevt.clear()
85+
86+
async def join(self):
87+
await self._jnevt.wait()
88+
89+

v3/primitives/tests/asyntest.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def print_tests():
3434
test(7) Test the Condition class.
3535
test(8) Test the Queue class.
3636
test(9) Test the RingbufQueue class.
37+
test(10) Test the Queue task_done/join behavior.
3738
'''
3839
print('\x1b[32m')
3940
print(st)
@@ -636,6 +637,68 @@ def rbq_test():
636637
''', 6)
637638
asyncio.run(rbq_go())
638639

640+
641+
# ************ Queue task_done/join test ************
642+
async def q_task_done_join_consumer(q):
643+
while True:
644+
r = await q.get()
645+
print('consumer', 'got/processing {}'.format(r))
646+
await asyncio.sleep(.5)
647+
q.task_done()
648+
async def q_task_done_join_producer(q):
649+
print('producer','loading jobs')
650+
for x in range(10):
651+
await q.put(x)
652+
print('producer','await q.join')
653+
await q.join()
654+
print('producer','joined!', 'task done!')
655+
async def q_task_done_join_go():
656+
q = Queue()
657+
658+
consumer_task = asyncio.create_task(q_task_done_join_consumer(q))
659+
producer_task = asyncio.create_task(q_task_done_join_producer(q))
660+
await asyncio.sleep(0)
661+
662+
print('test','await q.join')
663+
await q.join()
664+
print('test','all jobs done!')
665+
666+
print('test','join again')
667+
await q.join()
668+
669+
await asyncio.sleep(0)
670+
print('test','producer_task.done()?', producer_task.done())
671+
672+
consumer_task.cancel()
673+
await asyncio.gather(consumer_task, return_exceptions=True)
674+
675+
print('test','DONE')
676+
677+
678+
def q_task_done_join_test():
679+
printexp('''Test Queue task_done/join behaviors
680+
producer loading jobs
681+
producer await q.join
682+
test await q.join
683+
consumer got/processing 0
684+
consumer got/processing 1
685+
consumer got/processing 2
686+
consumer got/processing 3
687+
consumer got/processing 4
688+
consumer got/processing 5
689+
consumer got/processing 6
690+
consumer got/processing 7
691+
consumer got/processing 8
692+
consumer got/processing 9
693+
producer joined! task done!
694+
test all jobs done!
695+
test join again
696+
test producer_task.done()? True
697+
test DONE
698+
''', 5)
699+
asyncio.run(q_task_done_join_go())
700+
701+
639702
# ************ ************
640703
def test(n):
641704
try:
@@ -657,6 +720,8 @@ def test(n):
657720
queue_test() # Test the Queue class.
658721
elif n == 9:
659722
rbq_test() # Test the RingbufQueue class.
723+
elif n == 10:
724+
q_task_done_join_test() # Test the Queue task_done/join behavior.
660725
except KeyboardInterrupt:
661726
print('Interrupted')
662727
finally:

0 commit comments

Comments
 (0)