Skip to content

gh-96471: Add queue shutdown, next step. #102499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 74 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
4fd0640
Add threading implementation of queue shutdown
EpicWink Sep 1, 2022
d942c9e
Fix up implementation, add unit-tests
EpicWink Sep 18, 2022
f552ac1
Implement for asyncio queues
EpicWink Sep 18, 2022
78671f9
WIP: multiprocessing queue shutdown
EpicWink Oct 22, 2022
5f31f8e
WIP: multiprocessing queue shutdown
EpicWink Jan 19, 2023
9bbc5db
change comment
YvesDup Feb 10, 2023
f9f2c06
call to self._finished.set() in order to release all joined tasks/coros
YvesDup Feb 10, 2023
7491ef1
add unitests to `shutdwon` method
YvesDup Feb 10, 2023
dd22c6b
add unitests to `shutdwon` method
YvesDup Feb 10, 2023
5239306
replace global state variable with an enum `_QueueState`
YvesDup Feb 10, 2023
4b127b6
replace global state variable with an enum `_QueueState` - erase E ju…
YvesDup Feb 10, 2023
6402de7
simplify and unify tests
YvesDup Feb 11, 2023
be9588b
simplify and unify tests
YvesDup Feb 11, 2023
3613f5d
add `_shutdown_state` to tuples of `__getstate__` and `__setstate__`,…
YvesDup Feb 13, 2023
06775bb
Update initial tests with `self.assertRaises`
YvesDup Feb 15, 2023
6f01015
integration of shudown transition in `shutdown` method
YvesDup Feb 15, 2023
ff9895d
Set `test_shutdown` prefix to all unittests
YvesDup Feb 15, 2023
d42433e
asyncio.queue: refactoring of tests, add new tests, last updates and…
YvesDup Feb 28, 2023
53078bb
first version working
YvesDup Feb 28, 2023
0075039
Some corrections
YvesDup Feb 28, 2023
b4a53d2
Change Enum to global about `shutdown_state` attr
YvesDup Mar 3, 2023
dbe2078
Add new tests
YvesDup Mar 3, 2023
18bb995
Fixes bugs
YvesDup Mar 3, 2023
05700d5
Add first tests
YvesDup Mar 3, 2023
7d01747
Update tests
YvesDup Mar 3, 2023
aad0cba
add _wait()
YvesDup Mar 3, 2023
d9dbb33
Move some tests about shutdwon state and empty queue
YvesDup Mar 3, 2023
9f4c0a3
Merge branch 'python:main' into queue-shutdown
YvesDup Mar 4, 2023
db6a257
📜🤖 Added by blurb_it.
blurb-it[bot] Mar 7, 2023
01e5880
Update 2023-03-07-15-42-27.gh-issue-96471.oWZtwQ.rst
YvesDup Mar 7, 2023
88627bb
Merge branch 'main' into queue-shutdown
YvesDup Mar 7, 2023
37da705
Update test _shutdown_all_methods
YvesDup Mar 7, 2023
795fb2d
Fix some bugs, Refactoring code
YvesDup Mar 8, 2023
0de7836
Merge branch 'main' into queue-shutdown
YvesDup Mar 9, 2023
4443237
Merge branch 'main' into queue-shutdown
YvesDup Mar 15, 2023
499157d
Merge branch 'main' into queue-shutdown
AlexWaygood Mar 15, 2023
99eadd7
Update test names
YvesDup Mar 15, 2023
670d864
Refactoring and fix minor bugs
YvesDup Mar 15, 2023
6af0e8e
suppress import enum
YvesDup Mar 15, 2023
a3e03c5
update docstrings
YvesDup Mar 17, 2023
bc30db7
Suppress `import ctypes`, causes no necessary uses
YvesDup Mar 17, 2023
64defd4
Merge branch 'main' into queue-shutdown
YvesDup Mar 19, 2023
4382409
fix segmentation fault: use ctx.Value,
YvesDup Mar 22, 2023
aa70dc2
fix segmentation fault:
YvesDup Mar 22, 2023
cbfd771
Add private method about shutdown_state
YvesDup Mar 22, 2023
0d095bc
Add private methods to check _shutdown_state attr
YvesDup Mar 24, 2023
6d1f072
Merge branch 'main' into queue-shutdown
YvesDup Mar 24, 2023
891cffe
Update docs for queue shutdown
EpicWink Mar 28, 2023
e30933f
Update Lib/queue.py
YvesDup Mar 29, 2023
de5714d
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
da2e3c7
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
50857c0
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
56272b9
Update Lib/queue.py
YvesDup Mar 29, 2023
db7eaff
Update Lib/queue.py
YvesDup Mar 29, 2023
4099fb8
Update Lib/multiprocessing/queues.py
YvesDup Mar 29, 2023
58007dc
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
225387f
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
7c6e1c7
Update Lib/queue.py
YvesDup Mar 29, 2023
c025766
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
c9ae3be
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
e9b66b2
Update some tests
YvesDup Apr 6, 2023
570158e
Suppress a borderline assert
YvesDup Apr 6, 2023
eeb47b0
Fix bugs
YvesDup Apr 6, 2023
6f0b131
Merge branch 'main' into queue-shutdown
YvesDup Apr 7, 2023
4e2a19e
ran patchcheck
YvesDup Apr 7, 2023
6926e11
remove ./Tools/c-analyzer/cpython/_parser.py
YvesDup Apr 7, 2023
d301c90
Merge branch 'main' into queue-shutdown
arhadthedev Apr 10, 2023
49879a0
Merge branch 'main' into queue-shutdown
YvesDup Apr 11, 2023
31ea16b
Add `shutdown` method documentation
YvesDup May 5, 2023
bed3a4b
Add `shutdown` method to documentation
YvesDup May 5, 2023
8e8dcfa
Add `shutdown` method to documentation
YvesDup May 5, 2023
9eed14e
Add `shutdown` method to documentation
YvesDup May 5, 2023
66efd9c
Merge pull request #4 from EpicWink/yvesdup-queue-shutdown-2
YvesDup May 5, 2023
f4ad064
Merge branch 'main' into queue-shutdown
YvesDup May 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Doc/library/asyncio-queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ Queue
Remove and return an item from the queue. If queue is empty,
wait until an item is available.

Raises :exc:`QueueShutDown` if the queue has been shut down and
is empty, or if the queue has been shut down immediately.

.. method:: get_nowait()

Return an item if one is immediately available, else raise
Expand All @@ -77,11 +80,16 @@ Queue
work on it is complete. When the count of unfinished tasks drops
to zero, :meth:`join` unblocks.

Raises :exc:`QueueShutDown` if the queue has been shut down
immediately.

.. coroutinemethod:: put(item)

Put an item into the queue. If the queue is full, wait until a
free slot is available before adding the item.

Raises :exc:`QueueShutDown` if the queue has been shut down.

.. method:: put_nowait(item)

Put an item into the queue without blocking.
Expand All @@ -92,6 +100,19 @@ Queue

Return the number of items in the queue.

.. method:: shutdown(immediate=False)

Shut-down the queue, making queue gets and puts raise
:exc:`QueueShutDown`.

By default, gets will only raise once the queue is empty. Set
*immediate* to true to make gets raise immediately instead.

All blocked callers of put() will be unblocked, and also get()
and join() if *immediate* is true.

.. versionadded:: 3.13

.. method:: task_done()

Indicate that a formerly enqueued task is complete.
Expand All @@ -108,6 +129,9 @@ Queue
Raises :exc:`ValueError` if called more times than there were
items placed in the queue.

Raises :exc:`QueueShutDown` if the queue has been shut down
immediately.


Priority Queue
==============
Expand Down Expand Up @@ -145,6 +169,14 @@ Exceptions
on a queue that has reached its *maxsize*.


.. exception:: QueueShutDown

Exception raised when getting an item from or putting an item onto a
queue which has been shut down.

.. versionadded:: 3.13


Examples
========

Expand Down
24 changes: 22 additions & 2 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,8 @@ For an example of the usage of queues for interprocess communication see
free slot was available within that time. Otherwise (*block* is
``False``), put an item on the queue if a free slot is immediately
available, else raise the :exc:`queue.Full` exception (*timeout* is
ignored in that case).
ignored in that case). Raises :exc:`ShutDown` if the queue has been shut
down.

.. versionchanged:: 3.8
If the queue is closed, :exc:`ValueError` is raised instead of
Expand All @@ -863,7 +864,9 @@ For an example of the usage of queues for interprocess communication see
it blocks at most *timeout* seconds and raises the :exc:`queue.Empty`
exception if no item was available within that time. Otherwise (block is
``False``), return an item if one is immediately available, else raise the
:exc:`queue.Empty` exception (*timeout* is ignored in that case).
:exc:`queue.Empty` exception (*timeout* is ignored in that case). Raises
:exc:`queue.ShutDown` if the queue has been shut down and is empty, or if
the queue has been shut down immediately.

.. versionchanged:: 3.8
If the queue is closed, :exc:`ValueError` is raised instead of
Expand All @@ -873,6 +876,19 @@ For an example of the usage of queues for interprocess communication see

Equivalent to ``get(False)``.

.. method:: shutdown(immediate=False)

Shut-down the queue, making queue gets and puts raise
:exc:`queue.ShutDown`.

By default, gets will only raise once the queue is empty. Set
*immediate* to true to make gets raise immediately instead.

All blocked callers of put() will be unblocked, and also get()
and join() if *immediate* is true.

.. versionadded:: 3.13

:class:`multiprocessing.Queue` has a few additional methods not found in
:class:`queue.Queue`. These methods are usually unnecessary for most
code:
Expand Down Expand Up @@ -962,6 +978,8 @@ For an example of the usage of queues for interprocess communication see
Raises a :exc:`ValueError` if called more times than there were items
placed in the queue.

Raises :exc:`queue.ShutDown` if the queue has been shut down immediately.


.. method:: join()

Expand All @@ -973,6 +991,8 @@ For an example of the usage of queues for interprocess communication see
it is complete. When the count of unfinished tasks drops to zero,
:meth:`~queue.Queue.join` unblocks.

Raises :exc:`queue.ShutDown` if the queue has been shut down immediately.


Miscellaneous
~~~~~~~~~~~~~
Expand Down
36 changes: 36 additions & 0 deletions Doc/library/queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ The :mod:`queue` module defines the following classes and exceptions:
on a :class:`Queue` object which is full.


.. exception:: ShutDown

Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on
a :class:`Queue` object which has been shut down.

.. versionadded:: 3.13


.. _queueobjects:

Queue Objects
Expand Down Expand Up @@ -135,6 +143,8 @@ provide the public methods described below.
immediately available, else raise the :exc:`Full` exception (*timeout* is
ignored in that case).

Raises :exc:`ShutDown` if the queue has been shut down.


.. method:: Queue.put_nowait(item)

Expand All @@ -155,6 +165,9 @@ provide the public methods described below.
an uninterruptible wait on an underlying lock. This means that no exceptions
can occur, and in particular a SIGINT will not trigger a :exc:`KeyboardInterrupt`.

Raises :exc:`ShutDown` if the queue has been shut down and is empty, or if
the queue has been shut down immediately.


.. method:: Queue.get_nowait()

Expand All @@ -177,6 +190,8 @@ fully processed by daemon consumer threads.
Raises a :exc:`ValueError` if called more times than there were items placed in
the queue.

Raises :exc:`ShutDown` if the queue has been shut down immediately.


.. method:: Queue.join()

Expand All @@ -187,6 +202,8 @@ fully processed by daemon consumer threads.
indicate that the item was retrieved and all work on it is complete. When the
count of unfinished tasks drops to zero, :meth:`join` unblocks.

Raises :exc:`ShutDown` if the queue has been shut down immediately.


Example of how to wait for enqueued tasks to be completed::

Expand Down Expand Up @@ -214,6 +231,25 @@ Example of how to wait for enqueued tasks to be completed::
print('All work completed')


Terminating queues
^^^^^^^^^^^^^^^^^^

:class:`Queue` objects can be made to prevent further interaction by shutting
them down.

.. method:: Queue.shutdown(immediate=False)

Shut-down the queue, making queue gets and puts raise :exc:`ShutDown`.

By default, gets will only raise once the queue is empty. Set
*immediate* to true to make gets raise immediately instead.

All blocked callers of put() will be unblocked, and also get()
and join() if *immediate* is true.

.. versionadded:: 3.13


SimpleQueue Objects
-------------------

Expand Down
Loading