From 9114b1612b7cedf48b662cd91c8d73607429329e Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 27 Mar 2025 07:11:04 +0000 Subject: [PATCH 01/11] make resource_tracker re-entrant safe --- Lib/multiprocessing/resource_tracker.py | 163 ++++++++++++++---------- 1 file changed, 94 insertions(+), 69 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 05633ac21a259c..8748b50e4884cf 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -20,6 +20,7 @@ import sys import threading import warnings +from collections import deque from . import spawn from . import util @@ -66,6 +67,7 @@ def __init__(self): self._fd = None self._pid = None self._exitcode = None + self._reentrant_messages = deque() def _reentrant_call_error(self): # gh-109629: this happens if an explicit call to the ResourceTracker @@ -132,69 +134,100 @@ def ensure_running(self): This can be run from any process. Usually a child process will use the resource created by its parent.''' + return self._ensure_running_and_write() + + def _teardown_dead_process(self): + os.close(self._fd) + + # Clean-up to avoid dangling processes. + try: + # _pid can be None if this process is a child from another + # python process, which has started the resource_tracker. + if self._pid is not None: + os.waitpid(self._pid, 0) + except ChildProcessError: + # The resource_tracker has already been terminated. + pass + self._fd = None + self._pid = None + self._exitcode = None + + warnings.warn('resource_tracker: process died unexpectedly, ' + 'relaunching. Some resources might leak.') + + def _launch(self): + fds_to_pass = [] + try: + fds_to_pass.append(sys.stderr.fileno()) + except Exception: + pass + cmd = 'from multiprocessing.resource_tracker import main;main(%d)' + r, w = os.pipe() + try: + fds_to_pass.append(r) + # process will out live us, so no need to wait on pid + exe = spawn.get_executable() + args = [exe] + util._args_from_interpreter_flags() + args += ['-c', cmd % r] + # bpo-33613: Register a signal mask that will block the signals. + # This signal mask will be inherited by the child that is going + # to be spawned and will protect the child from a race condition + # that can make the child die before it registers signal handlers + # for SIGINT and SIGTERM. The mask is unregistered after spawning + # the child. + prev_sigmask = None + try: + if _HAVE_SIGMASK: + prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) + pid = util.spawnv_passfds(exe, args, fds_to_pass) + finally: + if prev_sigmask is not None: + signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask) + except: + os.close(w) + raise + else: + self._fd = w + self._pid = pid + finally: + os.close(r) + + def _ensure_running_and_write(self, msg=None): with self._lock: if self._lock._recursion_count() > 1: # The code below is certainly not reentrant-safe, so bail out - return self._reentrant_call_error() + if msg is None: + return self._reentrant_call_error() + return self._reentrant_messages.append(msg) + if self._fd is not None: # resource tracker was launched before, is it still running? - if self._check_alive(): - # => still alive - return - # => dead, launch it again - os.close(self._fd) - - # Clean-up to avoid dangling processes. + if msg is None: + to_send = b'PROBE:0:noop\n' + else: + to_send = msg try: - # _pid can be None if this process is a child from another - # python process, which has started the resource_tracker. - if self._pid is not None: - os.waitpid(self._pid, 0) - except ChildProcessError: - # The resource_tracker has already been terminated. - pass - self._fd = None - self._pid = None - self._exitcode = None - - warnings.warn('resource_tracker: process died unexpectedly, ' - 'relaunching. Some resources might leak.') + self._write(to_send) + except OSError: + dead = True + else: + dead = False + if dead: + self._teardown_dead_process() + self._launch() + + msg = None # message was sent in probe + else: + self._launch() - fds_to_pass = [] - try: - fds_to_pass.append(sys.stderr.fileno()) - except Exception: - pass - cmd = 'from multiprocessing.resource_tracker import main;main(%d)' - r, w = os.pipe() + while True: try: - fds_to_pass.append(r) - # process will out live us, so no need to wait on pid - exe = spawn.get_executable() - args = [exe] + util._args_from_interpreter_flags() - args += ['-c', cmd % r] - # bpo-33613: Register a signal mask that will block the signals. - # This signal mask will be inherited by the child that is going - # to be spawned and will protect the child from a race condition - # that can make the child die before it registers signal handlers - # for SIGINT and SIGTERM. The mask is unregistered after spawning - # the child. - prev_sigmask = None - try: - if _HAVE_SIGMASK: - prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) - pid = util.spawnv_passfds(exe, args, fds_to_pass) - finally: - if prev_sigmask is not None: - signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask) - except: - os.close(w) - raise - else: - self._fd = w - self._pid = pid - finally: - os.close(r) + reentrant_msg = self._reentrant_messages.popleft() + except IndexError: + break + self._write(reentrant_msg) + if msg is not None: + self._write(msg) def _check_alive(self): '''Check that the pipe has not been closed by sending a probe.''' @@ -215,27 +248,19 @@ def unregister(self, name, rtype): '''Unregister name of resource with resource tracker.''' self._send('UNREGISTER', name, rtype) + def _write(self, msg): + nbytes = os.write(self._fd, msg) + assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( + nbytes, len(msg)) + def _send(self, cmd, name, rtype): - try: - self.ensure_running() - except ReentrantCallError: - # The code below might or might not work, depending on whether - # the resource tracker was already running and still alive. - # Better warn the user. - # (XXX is warnings.warn itself reentrant-safe? :-) - warnings.warn( - f"ResourceTracker called reentrantly for resource cleanup, " - f"which is unsupported. " - f"The {rtype} object {name!r} might leak.") msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') if len(msg) > 512: # posix guarantees that writes to a pipe of less than PIPE_BUF # bytes are atomic, and that PIPE_BUF >= 512 raise ValueError('msg too long') - nbytes = os.write(self._fd, msg) - assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( - nbytes, len(msg)) + self._ensure_running_and_write(msg) _resource_tracker = ResourceTracker() ensure_running = _resource_tracker.ensure_running From 60ac5f9feb6b04c0a8183481100ab407f437ae1a Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Thu, 27 Mar 2025 08:13:41 +0000 Subject: [PATCH 02/11] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst diff --git a/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst b/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst new file mode 100644 index 00000000000000..9d7cbe7c72d1e3 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst @@ -0,0 +1 @@ +Make ``ResourceTracker.send`` re-entrant safe From 52b2100e8fcd87902a77f35f64324776ef68d3c5 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 29 Mar 2025 07:33:38 +0000 Subject: [PATCH 03/11] Update Lib/multiprocessing/resource_tracker.py Co-authored-by: Gregory P. Smith --- Lib/multiprocessing/resource_tracker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 8748b50e4884cf..80bb9ce9b0e30a 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -250,8 +250,7 @@ def unregister(self, name, rtype): def _write(self, msg): nbytes = os.write(self._fd, msg) - assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( - nbytes, len(msg)) + assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}" def _send(self, cmd, name, rtype): msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') From ce829682aa92711e2c4c5f7432c88d547f09faf0 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 29 Mar 2025 07:33:50 +0000 Subject: [PATCH 04/11] Update Lib/multiprocessing/resource_tracker.py Co-authored-by: Gregory P. Smith --- Lib/multiprocessing/resource_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 80bb9ce9b0e30a..363c22ed335fb9 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -253,7 +253,7 @@ def _write(self, msg): assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}" def _send(self, cmd, name, rtype): - msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') + msg = f"{cmd}:{name}:{rtype}\n".encode("ascii") if len(msg) > 512: # posix guarantees that writes to a pipe of less than PIPE_BUF # bytes are atomic, and that PIPE_BUF >= 512 From 15e73a999580b0a43d79e11f0a3279d310e923ef Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 29 Mar 2025 07:34:31 +0000 Subject: [PATCH 05/11] Update Lib/multiprocessing/resource_tracker.py Co-authored-by: Gregory P. Smith --- Lib/multiprocessing/resource_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 363c22ed335fb9..9f2b29691f8b63 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -197,7 +197,7 @@ def _ensure_running_and_write(self, msg=None): if self._lock._recursion_count() > 1: # The code below is certainly not reentrant-safe, so bail out if msg is None: - return self._reentrant_call_error() + raise self._reentrant_call_error() return self._reentrant_messages.append(msg) if self._fd is not None: From 174a3ff5a39055c1e96dd1a2880d8094ea3b80b6 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 29 Mar 2025 07:52:41 +0000 Subject: [PATCH 06/11] trim trailing whitespace --- Lib/multiprocessing/resource_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 9f2b29691f8b63..a06fb6ef691c0c 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -253,7 +253,7 @@ def _write(self, msg): assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}" def _send(self, cmd, name, rtype): - msg = f"{cmd}:{name}:{rtype}\n".encode("ascii") + msg = f"{cmd}:{name}:{rtype}\n".encode("ascii") if len(msg) > 512: # posix guarantees that writes to a pipe of less than PIPE_BUF # bytes are atomic, and that PIPE_BUF >= 512 From 5d6406c183bd1653605cd0632444fab5aa8f5d9d Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 29 Mar 2025 07:59:15 +0000 Subject: [PATCH 07/11] use f-string and args = [x, *y, z] --- Lib/multiprocessing/resource_tracker.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index a06fb6ef691c0c..7d3b8dfe3470fd 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -167,8 +167,12 @@ def _launch(self): fds_to_pass.append(r) # process will out live us, so no need to wait on pid exe = spawn.get_executable() - args = [exe] + util._args_from_interpreter_flags() - args += ['-c', cmd % r] + args = [ + exe, + *util._args_from_interpreter_flags(), + '-c', + f'from multiprocessing.resource_tracker import main;main({r})', + ] # bpo-33613: Register a signal mask that will block the signals. # This signal mask will be inherited by the child that is going # to be spawned and will protect the child from a race condition From 395d70611da1f9e904fe5bab866bf1966178fea5 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 29 Mar 2025 08:04:38 +0000 Subject: [PATCH 08/11] raise self._reentrant_call_error --- Lib/multiprocessing/resource_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 7d3b8dfe3470fd..366cd6e71c2ce6 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -104,7 +104,7 @@ def _stop_locked( # This shouldn't happen (it might when called by a finalizer) # so we check for it anyway. if self._lock._recursion_count() > 1: - return self._reentrant_call_error() + raise self._reentrant_call_error() if self._fd is None: # not running return From 33311621a2a08e8c60710a8ec0cb33afc6edbb58 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 29 Mar 2025 16:54:41 +0000 Subject: [PATCH 09/11] Update Lib/multiprocessing/resource_tracker.py Co-authored-by: Gregory P. Smith --- Lib/multiprocessing/resource_tracker.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 366cd6e71c2ce6..7dac3e7c7eb430 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -213,10 +213,6 @@ def _ensure_running_and_write(self, msg=None): try: self._write(to_send) except OSError: - dead = True - else: - dead = False - if dead: self._teardown_dead_process() self._launch() From a2c537d751c172c4d6d8ae721cc0fb2e578900b6 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 29 Mar 2025 17:14:41 +0000 Subject: [PATCH 10/11] Update Lib/multiprocessing/resource_tracker.py --- Lib/multiprocessing/resource_tracker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 7dac3e7c7eb430..2249948da98b00 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -161,7 +161,6 @@ def _launch(self): fds_to_pass.append(sys.stderr.fileno()) except Exception: pass - cmd = 'from multiprocessing.resource_tracker import main;main(%d)' r, w = os.pipe() try: fds_to_pass.append(r) From 5351b10df2e7794b84903ea3e78a66e4c2746196 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 29 Mar 2025 17:16:21 +0000 Subject: [PATCH 11/11] Update Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst --- .../next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst b/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst index 9d7cbe7c72d1e3..525802405bd8bd 100644 --- a/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst +++ b/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst @@ -1 +1 @@ -Make ``ResourceTracker.send`` re-entrant safe +Make ``ResourceTracker.send`` from :mod:`multiprocessing` re-entrant safe