Skip to content

Commit df4d84c

Browse files
EpicWinkYvesDup
andauthored
gh-96471: Add asyncio queue shutdown (#104228)
Co-authored-by: Duprat <[email protected]>
1 parent 1d3225a commit df4d84c

File tree

5 files changed

+301
-3
lines changed

5 files changed

+301
-3
lines changed

Doc/library/asyncio-queue.rst

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ Queue
6262
Remove and return an item from the queue. If queue is empty,
6363
wait until an item is available.
6464

65+
Raises :exc:`QueueShutDown` if the queue has been shut down and
66+
is empty, or if the queue has been shut down immediately.
67+
6568
.. method:: get_nowait()
6669

6770
Return an item if one is immediately available, else raise
@@ -82,6 +85,8 @@ Queue
8285
Put an item into the queue. If the queue is full, wait until a
8386
free slot is available before adding the item.
8487

88+
Raises :exc:`QueueShutDown` if the queue has been shut down.
89+
8590
.. method:: put_nowait(item)
8691

8792
Put an item into the queue without blocking.
@@ -92,6 +97,21 @@ Queue
9297

9398
Return the number of items in the queue.
9499

100+
.. method:: shutdown(immediate=False)
101+
102+
Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put`
103+
raise :exc:`QueueShutDown`.
104+
105+
By default, :meth:`~Queue.get` on a shut down queue will only
106+
raise once the queue is empty. Set *immediate* to true to make
107+
:meth:`~Queue.get` raise immediately instead.
108+
109+
All blocked callers of :meth:`~Queue.put` will be unblocked. If
110+
*immediate* is true, also unblock callers of :meth:`~Queue.get`
111+
and :meth:`~Queue.join`.
112+
113+
.. versionadded:: 3.13
114+
95115
.. method:: task_done()
96116

97117
Indicate that a formerly enqueued task is complete.
@@ -105,6 +125,9 @@ Queue
105125
call was received for every item that had been :meth:`~Queue.put`
106126
into the queue).
107127

128+
``shutdown(immediate=True)`` calls :meth:`task_done` for each
129+
remaining item in the queue.
130+
108131
Raises :exc:`ValueError` if called more times than there were
109132
items placed in the queue.
110133

@@ -145,6 +168,14 @@ Exceptions
145168
on a queue that has reached its *maxsize*.
146169

147170

171+
.. exception:: QueueShutDown
172+
173+
Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is
174+
called on a queue which has been shut down.
175+
176+
.. versionadded:: 3.13
177+
178+
148179
Examples
149180
========
150181

Doc/whatsnew/3.13.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,10 @@ asyncio
296296
with the tasks being completed.
297297
(Contributed by Justin Arthur in :gh:`77714`.)
298298

299+
* Add :meth:`asyncio.Queue.shutdown` (along with
300+
:exc:`asyncio.QueueShutDown`) for queue termination.
301+
(Contributed by Laurie Opperman in :gh:`104228`.)
302+
299303
base64
300304
------
301305

Lib/asyncio/queues.py

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
1+
__all__ = (
2+
'Queue',
3+
'PriorityQueue',
4+
'LifoQueue',
5+
'QueueFull',
6+
'QueueEmpty',
7+
'QueueShutDown',
8+
)
29

310
import collections
411
import heapq
@@ -18,6 +25,11 @@ class QueueFull(Exception):
1825
pass
1926

2027

28+
class QueueShutDown(Exception):
29+
"""Raised when putting on to or getting from a shut-down Queue."""
30+
pass
31+
32+
2133
class Queue(mixins._LoopBoundMixin):
2234
"""A queue, useful for coordinating producer and consumer coroutines.
2335
@@ -41,6 +53,7 @@ def __init__(self, maxsize=0):
4153
self._finished = locks.Event()
4254
self._finished.set()
4355
self._init(maxsize)
56+
self._is_shutdown = False
4457

4558
# These three are overridable in subclasses.
4659

@@ -81,6 +94,8 @@ def _format(self):
8194
result += f' _putters[{len(self._putters)}]'
8295
if self._unfinished_tasks:
8396
result += f' tasks={self._unfinished_tasks}'
97+
if self._is_shutdown:
98+
result += ' shutdown'
8499
return result
85100

86101
def qsize(self):
@@ -112,8 +127,12 @@ async def put(self, item):
112127
113128
Put an item into the queue. If the queue is full, wait until a free
114129
slot is available before adding item.
130+
131+
Raises QueueShutDown if the queue has been shut down.
115132
"""
116133
while self.full():
134+
if self._is_shutdown:
135+
raise QueueShutDown
117136
putter = self._get_loop().create_future()
118137
self._putters.append(putter)
119138
try:
@@ -125,7 +144,7 @@ async def put(self, item):
125144
self._putters.remove(putter)
126145
except ValueError:
127146
# The putter could be removed from self._putters by a
128-
# previous get_nowait call.
147+
# previous get_nowait call or a shutdown call.
129148
pass
130149
if not self.full() and not putter.cancelled():
131150
# We were woken up by get_nowait(), but can't take
@@ -138,7 +157,11 @@ def put_nowait(self, item):
138157
"""Put an item into the queue without blocking.
139158
140159
If no free slot is immediately available, raise QueueFull.
160+
161+
Raises QueueShutDown if the queue has been shut down.
141162
"""
163+
if self._is_shutdown:
164+
raise QueueShutDown
142165
if self.full():
143166
raise QueueFull
144167
self._put(item)
@@ -150,8 +173,13 @@ async def get(self):
150173
"""Remove and return an item from the queue.
151174
152175
If queue is empty, wait until an item is available.
176+
177+
Raises QueueShutDown if the queue has been shut down and is empty, or
178+
if the queue has been shut down immediately.
153179
"""
154180
while self.empty():
181+
if self._is_shutdown and self.empty():
182+
raise QueueShutDown
155183
getter = self._get_loop().create_future()
156184
self._getters.append(getter)
157185
try:
@@ -163,7 +191,7 @@ async def get(self):
163191
self._getters.remove(getter)
164192
except ValueError:
165193
# The getter could be removed from self._getters by a
166-
# previous put_nowait call.
194+
# previous put_nowait call, or a shutdown call.
167195
pass
168196
if not self.empty() and not getter.cancelled():
169197
# We were woken up by put_nowait(), but can't take
@@ -176,8 +204,13 @@ def get_nowait(self):
176204
"""Remove and return an item from the queue.
177205
178206
Return an item if one is immediately available, else raise QueueEmpty.
207+
208+
Raises QueueShutDown if the queue has been shut down and is empty, or
209+
if the queue has been shut down immediately.
179210
"""
180211
if self.empty():
212+
if self._is_shutdown:
213+
raise QueueShutDown
181214
raise QueueEmpty
182215
item = self._get()
183216
self._wakeup_next(self._putters)
@@ -194,6 +227,9 @@ def task_done(self):
194227
been processed (meaning that a task_done() call was received for every
195228
item that had been put() into the queue).
196229
230+
shutdown(immediate=True) calls task_done() for each remaining item in
231+
the queue.
232+
197233
Raises ValueError if called more times than there were items placed in
198234
the queue.
199235
"""
@@ -214,6 +250,32 @@ async def join(self):
214250
if self._unfinished_tasks > 0:
215251
await self._finished.wait()
216252

253+
def shutdown(self, immediate=False):
254+
"""Shut-down the queue, making queue gets and puts raise QueueShutDown.
255+
256+
By default, gets will only raise once the queue is empty. Set
257+
'immediate' to True to make gets raise immediately instead.
258+
259+
All blocked callers of put() will be unblocked, and also get()
260+
and join() if 'immediate'.
261+
"""
262+
self._is_shutdown = True
263+
if immediate:
264+
while not self.empty():
265+
self._get()
266+
if self._unfinished_tasks > 0:
267+
self._unfinished_tasks -= 1
268+
if self._unfinished_tasks == 0:
269+
self._finished.set()
270+
while self._getters:
271+
getter = self._getters.popleft()
272+
if not getter.done():
273+
getter.set_result(None)
274+
while self._putters:
275+
putter = self._putters.popleft()
276+
if not putter.done():
277+
putter.set_result(None)
278+
217279

218280
class PriorityQueue(Queue):
219281
"""A subclass of Queue; retrieves entries in priority order (lowest first).

0 commit comments

Comments
 (0)