From de1c4870a94097805046ad93829f728ffb20c876 Mon Sep 17 00:00:00 2001 From: Louis Paulot <55740424+lpaulot@users.noreply.github.com> Date: Tue, 12 Jul 2022 19:06:45 +0200 Subject: [PATCH 1/6] Fix hang in process pool executor. --- Lib/concurrent/futures/process.py | 9 +++++++++ Lib/test/test_concurrent_futures.py | 14 ++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7e2f5fa30e8264..97b2aaccddc039 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -492,6 +492,8 @@ def terminate_broken(self, cause): for p in self.processes.values(): p.terminate() + self.drain_call_queue() + # clean up resources self.join_executor_internals() @@ -551,6 +553,13 @@ def get_n_children_alive(self): # This is an upper bound on the number of children alive. return sum(p.is_alive() for p in self.processes.values()) + def drain_call_queue(self): + while True: + try: + self.call_queue.get(timeout=0.1) + except queue.Empty: + return + _system_limits_checked = False _system_limited = None diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index f50255bd575601..de40b6a8f9757c 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1113,6 +1113,11 @@ def _crash(delay=None): faulthandler._sigsegv() +def _crash_with_data(data): + """Induces a segfault with dummy data in input.""" + _crash() + + def _exit(): """Induces a sys exit with exitcode 1.""" sys.exit(1) @@ -1312,6 +1317,15 @@ def test_shutdown_deadlock_pickle(self): # dangling threads executor_manager.join() + def test_crash_big_data(self): + self.executor.shutdown(wait=True) + data = "a" * support.PIPE_MAX_SIZE + with self.executor_type(max_workers=2, + mp_context=self.get_context()) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + with self.assertRaises(BrokenProcessPool): + list(executor.map(_crash_with_data, [data] * 10)) + create_executor_tests(ExecutorDeadlockTest, executor_mixins=(ProcessPoolForkMixin, From 21a1ae4c9fbcfb9c202f849fbcb47c7ede4621aa Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Tue, 12 Jul 2022 18:45:13 +0000 Subject: [PATCH 2/6] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst diff --git a/Misc/NEWS.d/next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst b/Misc/NEWS.d/next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst new file mode 100644 index 00000000000000..b281367e1eb2df --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst @@ -0,0 +1 @@ +Fix hanging ProcessPoolExecutor when a child process crashes while data is being written in the call queue. From d82bbb8e68c8691bad5c535cab6e1381079782ba Mon Sep 17 00:00:00 2001 From: Louis Paulot <55740424+lpaulot@users.noreply.github.com> Date: Tue, 12 Jul 2022 22:00:59 +0200 Subject: [PATCH 3/6] Try to fix hanging ProcessPoolExecutor also on MacOS --- Lib/concurrent/futures/process.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 97b2aaccddc039..9e14d85bf0eab2 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -182,6 +182,12 @@ def _on_queue_feeder_error(self, e, obj): else: super()._on_queue_feeder_error(e, obj) + def drain(self): + self._closed = True + self._buffer.clear() + while self._poll(timeout=0.1): + self._recv_bytes() + def _get_chunks(*iterables, chunksize): """ Iterates over zip()ed iterables in chunks. """ @@ -492,7 +498,7 @@ def terminate_broken(self, cause): for p in self.processes.values(): p.terminate() - self.drain_call_queue() + self.call_queue.drain() # clean up resources self.join_executor_internals() @@ -553,13 +559,6 @@ def get_n_children_alive(self): # This is an upper bound on the number of children alive. return sum(p.is_alive() for p in self.processes.values()) - def drain_call_queue(self): - while True: - try: - self.call_queue.get(timeout=0.1) - except queue.Empty: - return - _system_limits_checked = False _system_limited = None From 8d2552e85aefd6611769cba2a2ae749d22a8f85d Mon Sep 17 00:00:00 2001 From: Louis Paulot <55740424+lpaulot@users.noreply.github.com> Date: Tue, 12 Jul 2022 22:23:21 +0200 Subject: [PATCH 4/6] Simpler fix of deadlock in ProcessPoolExecutor --- Lib/concurrent/futures/process.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 9e14d85bf0eab2..998d108a77d0dd 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -182,12 +182,6 @@ def _on_queue_feeder_error(self, e, obj): else: super()._on_queue_feeder_error(e, obj) - def drain(self): - self._closed = True - self._buffer.clear() - while self._poll(timeout=0.1): - self._recv_bytes() - def _get_chunks(*iterables, chunksize): """ Iterates over zip()ed iterables in chunks. """ @@ -498,7 +492,8 @@ def terminate_broken(self, cause): for p in self.processes.values(): p.terminate() - self.call_queue.drain() + # Prevent queue writing to a pipe which is no longer read. + self.call_queue._reader.close() # clean up resources self.join_executor_internals() From 2794188126850d1f42e65206f749ae3fd7bb36b5 Mon Sep 17 00:00:00 2001 From: Louis Paulot <55740424+lpaulot@users.noreply.github.com> Date: Mon, 10 Jul 2023 21:48:25 +0200 Subject: [PATCH 5/6] Reference GH issue 94777. --- Lib/concurrent/futures/process.py | 1 + Lib/test/test_concurrent_futures.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index bc65eedc899752..301207f59de37a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -500,6 +500,7 @@ def terminate_broken(self, cause): p.terminate() # Prevent queue writing to a pipe which is no longer read. + # https://github.com/python/cpython/issues/94777 self.call_queue._reader.close() # clean up resources diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 1ce4b834c159dd..39dbe234e765e8 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1377,6 +1377,10 @@ def test_shutdown_deadlock_pickle(self): executor_manager.join() def test_crash_big_data(self): + # Test that there is a clean exception instad of a deadlock when a + # child process crashes while some data is being written into the + # queue. + # https://github.com/python/cpython/issues/94777 self.executor.shutdown(wait=True) data = "a" * support.PIPE_MAX_SIZE with self.executor_type(max_workers=2, From 306e4902a1794c5464d2b8f7c7c177baffb2b0a4 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith" Date: Mon, 10 Jul 2023 14:12:08 -0700 Subject: [PATCH 6/6] ReSTify NEWS --- .../next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst b/Misc/NEWS.d/next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst index b281367e1eb2df..2c04a35fbfce13 100644 --- a/Misc/NEWS.d/next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst +++ b/Misc/NEWS.d/next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst @@ -1 +1 @@ -Fix hanging ProcessPoolExecutor when a child process crashes while data is being written in the call queue. +Fix hanging :mod:`multiprocessing` ``ProcessPoolExecutor`` when a child process crashes while data is being written in the call queue.