Skip to content

Commit 07108cc

Browse files
queue: task_done/join behavior updated for initial empty queue
1 parent 986b5c3 commit 07108cc

File tree

2 files changed

+28
-21
lines changed

2 files changed

+28
-21
lines changed

v3/primitives/queue.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def __init__(self, maxsize=0):
2828

2929
self._jncnt = 0
3030
self._jnevt = asyncio.Event()
31+
self._upd_jnevt(0) #update join event
3132

3233
def _get(self):
3334
self._evget.set() # Schedule all tasks waiting on get
@@ -48,7 +49,7 @@ def get_nowait(self): # Remove and return an item from the queue.
4849
return self._get()
4950

5051
def _put(self, val):
51-
self._jncnt += 1
52+
self._upd_jnevt(1) # update join event
5253
self._evput.set() # Schedule tasks waiting on put
5354
self._evput.clear()
5455
self._queue.append(val)
@@ -76,14 +77,18 @@ def full(self): # Return True if there are maxsize items in the queue.
7677
# any negative number, then full() is never True.
7778
return self.maxsize > 0 and self.qsize() >= self.maxsize
7879

79-
def task_done(self):
80-
self._jncnt -= 1
80+
81+
def _upd_jnevt(self, inc:int): # #Update join count and join event
82+
self._jncnt += inc
8183
if self._jncnt <= 0:
8284
self._jnevt.set()
8385
else:
8486
self._jnevt.clear()
8587

86-
async def join(self):
88+
def task_done(self): # Task Done decrements counter
89+
self._upd_jnevt(-1)
90+
91+
async def join(self): # Wait for join event
8792
await self._jnevt.wait()
8893

8994

v3/primitives/tests/asyntest.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -645,29 +645,31 @@ async def q_task_done_join_consumer(q):
645645
print('consumer', 'got/processing {}'.format(r))
646646
await asyncio.sleep(.5)
647647
q.task_done()
648-
async def q_task_done_join_producer(q):
649-
print('producer','loading jobs')
650-
for x in range(10):
651-
await q.put(x)
652-
print('producer','await q.join')
648+
async def q_task_done_join_waiter(q):
649+
print('waiter','await q.join')
653650
await q.join()
654-
print('producer','joined!', 'task done!')
651+
print('waiter','joined!', 'task done!')
655652
async def q_task_done_join_go():
656653
q = Queue()
657654

655+
#empty queue should not block join
656+
print('test', 'await empty q.join')
657+
await q.join()
658+
print('test', 'pass')
659+
658660
consumer_task = asyncio.create_task(q_task_done_join_consumer(q))
659-
producer_task = asyncio.create_task(q_task_done_join_producer(q))
660-
await asyncio.sleep(0)
661+
waiter_task = asyncio.create_task(q_task_done_join_waiter(q))
662+
663+
#add jobs
664+
for x in range(10):
665+
await q.put(x)
661666

662667
print('test','await q.join')
663668
await q.join()
664669
print('test','all jobs done!')
665670

666-
print('test','join again')
667-
await q.join()
668-
669671
await asyncio.sleep(0)
670-
print('test','producer_task.done()?', producer_task.done())
672+
print('test','waiter_task.done()?', waiter_task.done())
671673

672674
consumer_task.cancel()
673675
await asyncio.gather(consumer_task, return_exceptions=True)
@@ -677,10 +679,11 @@ async def q_task_done_join_go():
677679

678680
def q_task_done_join_test():
679681
printexp('''Test Queue task_done/join behaviors
680-
producer loading jobs
681-
producer await q.join
682+
test await empty q.join
683+
test pass
682684
test await q.join
683685
consumer got/processing 0
686+
waiter await q.join
684687
consumer got/processing 1
685688
consumer got/processing 2
686689
consumer got/processing 3
@@ -690,10 +693,9 @@ def q_task_done_join_test():
690693
consumer got/processing 7
691694
consumer got/processing 8
692695
consumer got/processing 9
693-
producer joined! task done!
694696
test all jobs done!
695-
test join again
696-
test producer_task.done()? True
697+
waiter joined! task done!
698+
test waiter_task.done()? True
697699
test DONE
698700
''', 5)
699701
asyncio.run(q_task_done_join_go())

0 commit comments

Comments
 (0)