From cd8ceafc1b7e5eb2be601b40342b4234f6ecd0aa Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 1 Sep 2022 18:33:18 +1000 Subject: [PATCH 1/3] Add threading queue shutdown * Include docs --- Doc/library/queue.rst | 36 +++++++++++++++++++++++++++++ Lib/queue.py | 52 ++++++++++++++++++++++++++++++++++++++++++ Lib/test/test_queue.py | 35 ++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index b2b787c5a8260c..d9952f1882ae05 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -93,6 +93,14 @@ The :mod:`queue` module defines the following classes and exceptions: on a :class:`Queue` object which is full. +.. exception:: ShutDown + + Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on + a :class:`Queue` object which has been shut down. + + .. versionadded:: 3.12 + + .. _queueobjects: Queue Objects @@ -135,6 +143,8 @@ provide the public methods described below. immediately available, else raise the :exc:`Full` exception (*timeout* is ignored in that case). + Raises :exc:`ShutDown` if the queue has been shut down. + .. method:: Queue.put_nowait(item) @@ -155,6 +165,9 @@ provide the public methods described below. an uninterruptible wait on an underlying lock. This means that no exceptions can occur, and in particular a SIGINT will not trigger a :exc:`KeyboardInterrupt`. + Raises :exc:`ShutDown` if the queue has been shut down and is empty, or if + the queue has been shut down immediately. + .. method:: Queue.get_nowait() @@ -177,6 +190,8 @@ fully processed by daemon consumer threads. Raises a :exc:`ValueError` if called more times than there were items placed in the queue. + Raises :exc:`ShutDown` if the queue has been shut down immediately. + .. method:: Queue.join() @@ -187,6 +202,8 @@ fully processed by daemon consumer threads. indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, :meth:`join` unblocks. + Raises :exc:`ShutDown` if the queue has been shut down immediately. + Example of how to wait for enqueued tasks to be completed:: @@ -214,6 +231,25 @@ Example of how to wait for enqueued tasks to be completed:: print('All work completed') +Terminating queues +^^^^^^^^^^^^^^^^^^ + +:class:`Queue` objects can be made to prevent further interaction by shutting +them down. + +.. method:: Queue.shutdown(immediate=False) + + Shut-down the queue, making queue gets and puts raise :exc:`ShutDown`. + + By default, gets will only raise once the queue is empty. Set + *immediate* to true to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if *immediate* is true. + + .. versionadded:: 3.12 + + SimpleQueue Objects ------------------- diff --git a/Lib/queue.py b/Lib/queue.py index 55f50088460f9e..f6af7cb6df5cff 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -25,6 +25,15 @@ class Full(Exception): pass +class ShutDown(Exception): + '''Raised when put/get with shut-down queue.''' + + +_queue_alive = "alive" +_queue_shutdown = "shutdown" +_queue_shutdown_immediate = "shutdown-immediate" + + class Queue: '''Create a queue object with a given maximum size. @@ -54,6 +63,9 @@ def __init__(self, maxsize=0): self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 + # Queue shut-down state + self.shutdown_state = _queue_alive + def task_done(self): '''Indicate that a formerly enqueued task is complete. @@ -87,6 +99,8 @@ def join(self): ''' with self.all_tasks_done: while self.unfinished_tasks: + if self.shutdown_state == _queue_shutdown_immediate: + return self.all_tasks_done.wait() def qsize(self): @@ -130,6 +144,8 @@ def put(self, item, block=True, timeout=None): is immediately available, else raise the Full exception ('timeout' is ignored in that case). ''' + if self.shutdown_state != _queue_alive: + raise ShutDown with self.not_full: if self.maxsize > 0: if not block: @@ -138,6 +154,8 @@ def put(self, item, block=True, timeout=None): elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() + if self.shutdown_state != _queue_alive: + raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: @@ -147,6 +165,8 @@ def put(self, item, block=True, timeout=None): if remaining <= 0.0: raise Full self.not_full.wait(remaining) + if self.shutdown_state != _queue_alive: + raise ShutDown self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() @@ -162,22 +182,36 @@ def get(self, block=True, timeout=None): available, else raise the Empty exception ('timeout' is ignored in that case). ''' + if self.shutdown_state == _queue_shutdown_immediate: + raise ShutDown with self.not_empty: if not block: if not self._qsize(): + if self.shutdown_state != _queue_alive: + raise ShutDown raise Empty elif timeout is None: while not self._qsize(): + if self.shutdown_state != _queue_alive: + raise ShutDown self.not_empty.wait() + if self.shutdown_state != _queue_alive: + raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): + if self.shutdown_state != _queue_alive: + raise ShutDown remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) + if self.shutdown_state != _queue_alive: + raise ShutDown + if self.shutdown_state == _queue_shutdown_immediate: + raise ShutDown item = self._get() self.not_full.notify() return item @@ -198,6 +232,24 @@ def get_nowait(self): ''' return self.get(block=False) + def shutdown(self, immediate=False): + '''Shut-down the queue, making queue gets and puts raise. + + By default, gets will only raise once the queue is empty. Set + 'immediate' to True to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if 'immediate'. The ShutDown exception is raised. + ''' + with self.mutex: + if immediate: + self.shutdown_state = _queue_shutdown_immediate + self.not_empty.notify_all() + self.all_tasks_done.notify_all() + else: + self.shutdown_state = _queue_shutdown + self.not_full.notify_all() + # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 33113a72e6b6a9..354299b9a5b16a 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -241,6 +241,41 @@ def test_shrinking_queue(self): with self.assertRaises(self.queue.Full): q.put_nowait(4) + def test_shutdown_empty(self): + q = self.type2test() + q.shutdown() + try: + q.put("data") + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + + def test_shutdown_nonempty(self): + q = self.type2test() + q.put("data") + q.shutdown() + q.get() + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + + def test_shutdown_immediate(self): + q = self.type2test() + q.put("data") + q.shutdown(immediate=True) + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + class QueueTest(BaseQueueTestMixin): def setUp(self): From 9c2971be6fe5894856c2fd8f636d53185bf18693 Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:52:33 +0100 Subject: [PATCH 2/3] Fix queue shutdown * Include raised exception in docstrings * Handle queue shutdown in task_done and join * Factor out queue-state checks and updates to methods * Logic fixes in qsize, get and shutdown * Don't set unfinished_tasks to 0 on immediate shutdown * Updated tests * Document feature added in 3.13 --- Doc/library/queue.rst | 4 +- Lib/queue.py | 68 ++++--- Lib/test/test_queue.py | 395 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 429 insertions(+), 38 deletions(-) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index d9952f1882ae05..33d6f2f4c85e1c 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -98,7 +98,7 @@ The :mod:`queue` module defines the following classes and exceptions: Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on a :class:`Queue` object which has been shut down. - .. versionadded:: 3.12 + .. versionadded:: 3.13 .. _queueobjects: @@ -247,7 +247,7 @@ them down. All blocked callers of put() will be unblocked, and also get() and join() if *immediate* is true. - .. versionadded:: 3.12 + .. versionadded:: 3.13 SimpleQueue Objects diff --git a/Lib/queue.py b/Lib/queue.py index f6af7cb6df5cff..f8a7ba072247f0 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -33,7 +33,6 @@ class ShutDown(Exception): _queue_shutdown = "shutdown" _queue_shutdown_immediate = "shutdown-immediate" - class Queue: '''Create a queue object with a given maximum size. @@ -63,7 +62,7 @@ def __init__(self, maxsize=0): self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 - # Queue shut-down state + # Queue shutdown state self.shutdown_state = _queue_alive def task_done(self): @@ -79,8 +78,12 @@ def task_done(self): Raises a ValueError if called more times than there were items placed in the queue. + + Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done: + if self._is_shutdown_immediate(): + raise ShutDown unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: @@ -96,12 +99,16 @@ def join(self): to indicate the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. + + Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done: + if self._is_shutdown_immediate(): + raise ShutDown while self.unfinished_tasks: - if self.shutdown_state == _queue_shutdown_immediate: - return self.all_tasks_done.wait() + if self._is_shutdown_immediate(): + raise ShutDown def qsize(self): '''Return the approximate size of the queue (not reliable!).''' @@ -143,10 +150,12 @@ def put(self, item, block=True, timeout=None): Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case). + + Raises ShutDown if the queue has been shut down. ''' - if self.shutdown_state != _queue_alive: - raise ShutDown with self.not_full: + if not self._is_alive(): + raise ShutDown if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: @@ -154,7 +163,7 @@ def put(self, item, block=True, timeout=None): elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() - if self.shutdown_state != _queue_alive: + if not self._is_alive(): raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") @@ -165,7 +174,7 @@ def put(self, item, block=True, timeout=None): if remaining <= 0.0: raise Full self.not_full.wait(remaining) - if self.shutdown_state != _queue_alive: + if not self._is_alive(): raise ShutDown self._put(item) self.unfinished_tasks += 1 @@ -181,37 +190,33 @@ def get(self, block=True, timeout=None): Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). + + Raises ShutDown if the queue has been shut down and is empty, + or if the queue has been shut down immediately. ''' - if self.shutdown_state == _queue_shutdown_immediate: - raise ShutDown with self.not_empty: + if self._is_shutdown_immediate() or\ + (self._is_shutdown() and not self._qsize()): + raise ShutDown if not block: if not self._qsize(): - if self.shutdown_state != _queue_alive: - raise ShutDown raise Empty elif timeout is None: while not self._qsize(): - if self.shutdown_state != _queue_alive: - raise ShutDown self.not_empty.wait() - if self.shutdown_state != _queue_alive: + if self._is_shutdown_immediate(): raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): - if self.shutdown_state != _queue_alive: - raise ShutDown remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - if self.shutdown_state != _queue_alive: + if self._is_shutdown_immediate(): raise ShutDown - if self.shutdown_state == _queue_shutdown_immediate: - raise ShutDown item = self._get() self.not_full.notify() return item @@ -242,14 +247,33 @@ def shutdown(self, immediate=False): and join() if 'immediate'. The ShutDown exception is raised. ''' with self.mutex: + if self._is_shutdown_immediate(): + return if immediate: - self.shutdown_state = _queue_shutdown_immediate + self._set_shutdown_immediate() self.not_empty.notify_all() + # release all blocked threads in `join()` self.all_tasks_done.notify_all() else: - self.shutdown_state = _queue_shutdown + self._set_shutdown() self.not_full.notify_all() + def _is_alive(self): + return self.shutdown_state == _queue_alive + + def _is_shutdown(self): + return self.shutdown_state == _queue_shutdown + + def _is_shutdown_immediate(self): + return self.shutdown_state == _queue_shutdown_immediate + + def _set_shutdown(self): + self.shutdown_state = _queue_shutdown + + def _set_shutdown_immediate(self): + self.shutdown_state = _queue_shutdown_immediate + + # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 354299b9a5b16a..d9e840a7c861ed 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -244,38 +244,405 @@ def test_shrinking_queue(self): def test_shutdown_empty(self): q = self.type2test() q.shutdown() - try: + with self.assertRaises(self.queue.ShutDown): q.put("data") - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass def test_shutdown_nonempty(self): q = self.type2test() q.put("data") q.shutdown() q.get() - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass def test_shutdown_immediate(self): q = self.type2test() q.put("data") q.shutdown(immediate=True) - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") + + def test_shutdown_allowed_transitions(self): + # allowed transitions would be from alive via shutdown to immediate + q = self.type2test() + self.assertEqual("alive", q.shutdown_state) + + q.shutdown() + self.assertEqual("shutdown", q.shutdown_state) + + q.shutdown(immediate=True) + self.assertEqual("shutdown-immediate", q.shutdown_state) + + q.shutdown(immediate=False) + self.assertNotEqual("shutdown", q.shutdown_state) + + def _shutdown_all_methods_in_one_thread(self, immediate): + q = self.type2test(2) + q.put("L") + q.put_nowait("O") + q.shutdown(immediate) + + with self.assertRaises(self.queue.ShutDown): + q.put("E") + with self.assertRaises(self.queue.ShutDown): + q.put_nowait("W") + if immediate: + with self.assertRaises(self.queue.ShutDown): + q.get() + with self.assertRaises(self.queue.ShutDown): + q.get_nowait() + with self.assertRaises(self.queue.ShutDown): + q.task_done() + with self.assertRaises(self.queue.ShutDown): + q.join() + else: + self.assertIn(q.get(), "LO") + q.task_done() + self.assertIn(q.get(), "LO") + q.task_done() + q.join() + # on shutdown(immediate=False) + # when queue is empty, should raise ShutDown Exception + with self.assertRaises(self.queue.ShutDown): + q.get() # p.get(True) + with self.assertRaises(self.queue.ShutDown): + q.get_nowait() # p.get(False) + with self.assertRaises(self.queue.ShutDown): + q.get(True, 1.0) + + def test_shutdown_all_methods_in_one_thread(self): + return self._shutdown_all_methods_in_one_thread(False) + + def test_shutdown_immediate_all_methods_in_one_thread(self): + return self._shutdown_all_methods_in_one_thread(True) + + def _write_msg_thread(self, q, n, results, delay, + i_when_exec_shutdown, + event_start, event_end): + event_start.wait() + for i in range(1, n+1): + try: + q.put((i, "YDLO")) + results.append(True) + except self.queue.ShutDown: + results.append(False) + # triggers shutdown of queue + if i == i_when_exec_shutdown: + event_end.set() + time.sleep(delay) + # end of all puts + try: + q.join() + except self.queue.ShutDown: + pass + + def _read_msg_thread(self, q, nb, results, delay, event_start): + event_start.wait() + block = True + while nb: + time.sleep(delay) + try: + # Get at least one message + q.get(block) + block = False + q.task_done() + results.append(True) + nb -= 1 + except self.queue.ShutDown: + results.append(False) + nb -= 1 + except self.queue.Empty: + pass + try: + q.join() + except self.queue.ShutDown: + pass + + def _shutdown_thread(self, q, event_end, immediate): + event_end.wait() + q.shutdown(immediate) + try: + q.join() except self.queue.ShutDown: pass + def _join_thread(self, q, delay, event_start): + event_start.wait() + time.sleep(delay) + try: + q.join() + except self.queue.ShutDown: + pass + + def _shutdown_all_methods_in_many_threads(self, immediate): + q = self.type2test() + ps = [] + ev_start = threading.Event() + ev_exec_shutdown = threading.Event() + res_puts = [] + res_gets = [] + delay = 1e-4 + read_process = 4 + nb_msgs = read_process * 16 + nb_msgs_r = nb_msgs // read_process + when_exec_shutdown = nb_msgs // 2 + lprocs = ( + (self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay, + when_exec_shutdown, + ev_start, ev_exec_shutdown)), + (self._read_msg_thread, read_process, (q, nb_msgs_r, + res_gets, delay*2, + ev_start)), + (self._join_thread, 2, (q, delay*2, ev_start)), + (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)), + ) + # start all threds + for func, n, args in lprocs: + for i in range(n): + ps.append(threading.Thread(target=func, args=args)) + ps[-1].start() + # set event in order to run q.shutdown() + ev_start.set() + + if not immediate: + assert(len(res_gets) == len(res_puts)) + assert(res_gets.count(True) == res_puts.count(True)) + else: + assert(len(res_gets) <= len(res_puts)) + assert(res_gets.count(True) <= res_puts.count(True)) + + def test_shutdown_all_methods_in_many_threads(self): + return self._shutdown_all_methods_in_many_threads(False) + + def test_shutdown_immediate_all_methods_in_many_threads(self): + return self._shutdown_all_methods_in_many_threads(True) + + def _get(self, q, go, results, shutdown=False): + go.wait() + try: + msg = q.get() + results.append(not shutdown) + return not shutdown + except self.queue.ShutDown: + results.append(shutdown) + return shutdown + + def _get_shutdown(self, q, go, results): + return self._get(q, go, results, True) + + def _get_task_done(self, q, go, results): + go.wait() + try: + msg = q.get() + q.task_done() + results.append(True) + return msg + except self.queue.ShutDown: + results.append(False) + return False + + def _put(self, q, msg, go, results, shutdown=False): + go.wait() + try: + q.put(msg) + results.append(not shutdown) + return not shutdown + except self.queue.ShutDown: + results.append(shutdown) + return shutdown + + def _put_shutdown(self, q, msg, go, results): + return self._put(q, msg, go, results, True) + + def _join(self, q, results, shutdown=False): + try: + q.join() + results.append(not shutdown) + return not shutdown + except self.queue.ShutDown: + results.append(shutdown) + return shutdown + + def _join_shutdown(self, q, results): + return self._join(q, results, True) + + def _shutdown_get(self, immediate): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + q.put("D") + # queue full + + if immediate: + thrds = ( + (self._get_shutdown, (q, go, results)), + (self._get_shutdown, (q, go, results)), + ) + else: + thrds = ( + # on shutdown(immediate=False) + # one of these threads shoud raise Shutdown + (self._get, (q, go, results)), + (self._get, (q, go, results)), + (self._get, (q, go, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + q.shutdown(immediate) + go.set() + for t in threads: + t.join() + if immediate: + self.assertListEqual(results, [True, True]) + else: + self.assertListEqual(sorted(results), [False] + [True]*(len(thrds)-1)) + + def test_shutdown_get(self): + return self._shutdown_get(False) + + def test_shutdown_immediate_get(self): + return self._shutdown_get(True) + + def _shutdown_put(self, immediate): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + q.put("D") + # queue fulled + + thrds = ( + (self._put_shutdown, (q, "E", go, results)), + (self._put_shutdown, (q, "W", go, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + q.shutdown() + go.set() + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + def test_shutdown_put(self): + return self._shutdown_put(False) + + def test_shutdown_immediate_put(self): + return self._shutdown_put(True) + + def _shutdown_join(self, immediate): + q = self.type2test() + results = [] + q.put("Y") + go = threading.Event() + nb = q.qsize() + + if immediate: + thrds = ( + (self._join_shutdown, (q, results)), + (self._join_shutdown, (q, results)), + ) + else: + thrds = ( + (self._join, (q, results)), + (self._join, (q, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + if not immediate: + res = [] + for i in range(nb): + threads.append(threading.Thread(target=self._get_task_done, args=(q, go, res))) + threads[-1].start() + q.shutdown(immediate) + go.set() + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + def test_shutdown_immediate_join(self): + return self._shutdown_join(True) + + def test_shutdown_join(self): + return self._shutdown_join(False) + + def _shutdown_put_join(self, immediate): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + nb = q.qsize() + # queue not fulled + + if immediate: + thrds = ( + (self._put_shutdown, (q, "E", go, results)), + (self._join_shutdown, (q, results)), + ) + else: + thrds = ( + (self._put_shutdown, (q, "E", go, results)), + (self._join, (q, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + if not immediate: + self.assertEqual(q.unfinished_tasks, nb) + for i in range(nb): + t = threading.Thread(target=q.task_done) + t.start() + threads.append(t) + go.set() + q.shutdown(immediate) + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + def test_shutdown_immediate_put_join(self): + return self._shutdown_put_join(True) + + def test_shutdown_put_join(self): + return self._shutdown_put_join(False) + + def test_shutdown_get_task_done_join(self): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + q.put("D") + self.assertEqual(q.unfinished_tasks, q.qsize()) + + thrds = ( + (self._get_task_done, (q, go, results)), + (self._get_task_done, (q, go, results)), + (self._join, (q, results)), + (self._join, (q, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + go.set() + q.shutdown(False) + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + class QueueTest(BaseQueueTestMixin): def setUp(self): From 089eb960ac306d0cb2fd5f681565461ce12e58a0 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sat, 6 May 2023 04:57:15 +0000 Subject: [PATCH 3/3] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst diff --git a/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst b/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst new file mode 100644 index 00000000000000..0ec0fa65e7d91b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst @@ -0,0 +1 @@ +Add queue.Queue termination with ``shutdown`` method