From ff7ccd77b68567a0205c3b75de60924d06107fb4 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Thu, 9 May 2019 16:38:38 +0200 Subject: [PATCH 01/19] MNT refactor semaphore_tracker -> resource_tracker --- Lib/multiprocessing/forkserver.py | 8 +-- Lib/multiprocessing/popen_spawn_posix.py | 4 +- ...maphore_tracker.py => resource_tracker.py} | 56 +++++++++---------- Lib/multiprocessing/spawn.py | 4 +- Lib/multiprocessing/synchronize.py | 4 +- Lib/test/_test_multiprocessing.py | 56 +++++++++---------- PCbuild/lib.pyproj | 2 +- 7 files changed, 67 insertions(+), 67 deletions(-) rename Lib/multiprocessing/{semaphore_tracker.py => resource_tracker.py} (75%) diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 040b46e66a0330..dabf7bcbe6d785 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -11,7 +11,7 @@ from . import connection from . import process from .context import reduction -from . import semaphore_tracker +from . import resource_tracker from . import spawn from . import util @@ -69,7 +69,7 @@ def connect_to_new_process(self, fds): parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() allfds = [child_r, child_w, self._forkserver_alive_fd, - semaphore_tracker.getfd()] + resource_tracker.getfd()] allfds += fds try: reduction.sendfds(client, allfds) @@ -90,7 +90,7 @@ def ensure_running(self): ensure_running() will do nothing. ''' with self._lock: - semaphore_tracker.ensure_running() + resource_tracker.ensure_running() if self._forkserver_pid is not None: # forkserver was launched before, is it still running? pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) @@ -290,7 +290,7 @@ def _serve_one(child_r, fds, unused_fds, handlers): os.close(fd) (_forkserver._forkserver_alive_fd, - semaphore_tracker._semaphore_tracker._fd, + resource_tracker._resource_tracker._fd, *_forkserver._inherited_fds) = fds # Run process object received over pipe diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py index 38151060efa2e3..59f8e452cae1d5 100644 --- a/Lib/multiprocessing/popen_spawn_posix.py +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -36,8 +36,8 @@ def duplicate_for_child(self, fd): return fd def _launch(self, process_obj): - from . import semaphore_tracker - tracker_fd = semaphore_tracker.getfd() + from . import resource_tracker + tracker_fd = resource_tracker.getfd() self._fds.append(tracker_fd) prep_data = spawn.get_preparation_data(process_obj._name) fp = io.BytesIO() diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/resource_tracker.py similarity index 75% rename from Lib/multiprocessing/semaphore_tracker.py rename to Lib/multiprocessing/resource_tracker.py index 3c2c3ad61aeeec..480eed1b1bb7b8 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -1,14 +1,14 @@ # # On Unix we run a server process which keeps track of unlinked -# semaphores. The server ignores SIGINT and SIGTERM and reads from a +# resources. The server ignores SIGINT and SIGTERM and reads from a # pipe. Every other process of the program has a copy of the writable # end of the pipe, so we get EOF when all other processes have exited. -# Then the server process unlinks any remaining semaphore names. +# Then the server process unlinks any remaining resource names. # # This is important because the system only supports a limited number -# of named semaphores, and they will not be automatically removed till -# the next reboot. Without this semaphore tracker process, "killall -# python" would probably leave unlinked semaphores. +# of named resources, and they will not be automatically removed till +# the next reboot. Without this resource tracker process, "killall +# python" would probably leave unlinked resources. # import os @@ -27,7 +27,7 @@ _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) -class SemaphoreTracker(object): +class ResourceTracker(object): def __init__(self): self._lock = threading.Lock() @@ -39,13 +39,13 @@ def getfd(self): return self._fd def ensure_running(self): - '''Make sure that semaphore tracker process is running. + '''Make sure that resource tracker process is running. This can be run from any process. Usually a child process will use - the semaphore created by its parent.''' + the resource created by its parent.''' with self._lock: if self._fd is not None: - # semaphore tracker was launched before, is it still running? + # resource tracker was launched before, is it still running? if self._check_alive(): # => still alive return @@ -55,24 +55,24 @@ def ensure_running(self): # Clean-up to avoid dangling processes. try: # _pid can be None if this process is a child from another - # python process, which has started the semaphore_tracker. + # python process, which has started the resource_tracker. if self._pid is not None: os.waitpid(self._pid, 0) except ChildProcessError: - # The semaphore_tracker has already been terminated. + # The resource_tracker has already been terminated. pass self._fd = None self._pid = None - warnings.warn('semaphore_tracker: process died unexpectedly, ' - 'relaunching. Some semaphores might leak.') + warnings.warn('resource_tracker: process died unexpectedly, ' + 'relaunching. Some resources might leak.') fds_to_pass = [] try: fds_to_pass.append(sys.stderr.fileno()) except Exception: pass - cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)' + cmd = 'from multiprocessing.resource_tracker import main;main(%d)' r, w = os.pipe() try: fds_to_pass.append(r) @@ -114,11 +114,11 @@ def _check_alive(self): return True def register(self, name): - '''Register name of semaphore with semaphore tracker.''' + '''Register name of resource with resource tracker.''' self._send('REGISTER', name) def unregister(self, name): - '''Unregister name of semaphore with semaphore tracker.''' + '''Unregister name of resource with resource tracker.''' self._send('UNREGISTER', name) def _send(self, cmd, name): @@ -133,14 +133,14 @@ def _send(self, cmd, name): nbytes, len(msg)) -_semaphore_tracker = SemaphoreTracker() -ensure_running = _semaphore_tracker.ensure_running -register = _semaphore_tracker.register -unregister = _semaphore_tracker.unregister -getfd = _semaphore_tracker.getfd +_resource_tracker = ResourceTracker() +ensure_running = _resource_tracker.ensure_running +register = _resource_tracker.register +unregister = _resource_tracker.unregister +getfd = _resource_tracker.getfd def main(fd): - '''Run semaphore tracker.''' + '''Run resource tracker.''' # protect the process from ^C and "killall python" etc signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) @@ -155,7 +155,7 @@ def main(fd): cache = set() try: - # keep track of registered/unregistered semaphores + # keep track of registered/unregistered resources with open(fd, 'rb') as f: for line in f: try: @@ -174,23 +174,23 @@ def main(fd): except: pass finally: - # all processes have terminated; cleanup any remaining semaphores + # all processes have terminated; cleanup any remaining resources if cache: try: - warnings.warn('semaphore_tracker: There appear to be %d ' - 'leaked semaphores to clean up at shutdown' % + warnings.warn('resource_tracker: There appear to be %d ' + 'leaked resources to clean up at shutdown' % len(cache)) except Exception: pass for name in cache: # For some reason the process which created and registered this - # semaphore has failed to unregister it. Presumably it has died. + # resource has failed to unregister it. Presumably it has died. # We therefore unlink it. try: name = name.decode('ascii') try: _multiprocessing.sem_unlink(name) except Exception as e: - warnings.warn('semaphore_tracker: %r: %s' % (name, e)) + warnings.warn('resource_tracker: %r: %s' % (name, e)) finally: pass diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 6759351f13abce..f66b5aa9267b6d 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -111,8 +111,8 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): _winapi.CloseHandle(source_process) fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) else: - from . import semaphore_tracker - semaphore_tracker._semaphore_tracker._fd = tracker_fd + from . import resource_tracker + resource_tracker._resource_tracker._fd = tracker_fd fd = pipe_handle exitcode = _main(fd) sys.exit(exitcode) diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 5137c49c1b6c9b..175ad828f02c18 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -76,14 +76,14 @@ def _after_fork(obj): # We only get here if we are on Unix with forking # disabled. When the object is garbage collected or the # process shuts down we unlink the semaphore name - from .semaphore_tracker import register + from .resource_tracker import register register(self._semlock.name) util.Finalize(self, SemLock._cleanup, (self._semlock.name,), exitpriority=0) @staticmethod def _cleanup(name): - from .semaphore_tracker import unregister + from .resource_tracker import unregister sem_unlink(name) unregister(name) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 836fde88cd266d..22636f546b4e8b 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4803,9 +4803,9 @@ def test_preload_resources(self): @unittest.skipIf(sys.platform == "win32", "test semantics don't make sense on Windows") -class TestSemaphoreTracker(unittest.TestCase): +class TestResourceTracker(unittest.TestCase): - def test_semaphore_tracker(self): + def test_resource_tracker(self): # # Check that killing process does not leak named semaphores # @@ -4838,22 +4838,22 @@ def test_semaphore_tracker(self): self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) err = p.stderr.read().decode('utf-8') p.stderr.close() - expected = 'semaphore_tracker: There appear to be 2 leaked semaphores' + expected = 'resource_tracker: There appear to be 2 leaked semaphores' self.assertRegex(err, expected) - self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1) + self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) - def check_semaphore_tracker_death(self, signum, should_die): + def check_resource_tracker_death(self, signum, should_die): # bpo-31310: if the semaphore tracker process has died, it should # be restarted implicitly. - from multiprocessing.semaphore_tracker import _semaphore_tracker - pid = _semaphore_tracker._pid + from multiprocessing.resource_tracker import _resource_tracker + pid = _resource_tracker._pid if pid is not None: os.kill(pid, signal.SIGKILL) os.waitpid(pid, 0) with warnings.catch_warnings(): warnings.simplefilter("ignore") - _semaphore_tracker.ensure_running() - pid = _semaphore_tracker._pid + _resource_tracker.ensure_running() + pid = _resource_tracker._pid os.kill(pid, signum) time.sleep(1.0) # give it time to die @@ -4874,50 +4874,50 @@ def check_semaphore_tracker_death(self, signum, should_die): self.assertEqual(len(all_warn), 1) the_warn = all_warn[0] self.assertTrue(issubclass(the_warn.category, UserWarning)) - self.assertTrue("semaphore_tracker: process died" + self.assertTrue("resource_tracker: process died" in str(the_warn.message)) else: self.assertEqual(len(all_warn), 0) - def test_semaphore_tracker_sigint(self): + def test_resource_tracker_sigint(self): # Catchable signal (ignored by semaphore tracker) - self.check_semaphore_tracker_death(signal.SIGINT, False) + self.check_resource_tracker_death(signal.SIGINT, False) - def test_semaphore_tracker_sigterm(self): + def test_resource_tracker_sigterm(self): # Catchable signal (ignored by semaphore tracker) - self.check_semaphore_tracker_death(signal.SIGTERM, False) + self.check_resource_tracker_death(signal.SIGTERM, False) - def test_semaphore_tracker_sigkill(self): + def test_resource_tracker_sigkill(self): # Uncatchable signal. - self.check_semaphore_tracker_death(signal.SIGKILL, True) + self.check_resource_tracker_death(signal.SIGKILL, True) @staticmethod - def _is_semaphore_tracker_reused(conn, pid): - from multiprocessing.semaphore_tracker import _semaphore_tracker - _semaphore_tracker.ensure_running() + def _is_resource_tracker_reused(conn, pid): + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker.ensure_running() # The pid should be None in the child process, expect for the fork # context. It should not be a new value. - reused = _semaphore_tracker._pid in (None, pid) - reused &= _semaphore_tracker._check_alive() + reused = _resource_tracker._pid in (None, pid) + reused &= _resource_tracker._check_alive() conn.send(reused) - def test_semaphore_tracker_reused(self): - from multiprocessing.semaphore_tracker import _semaphore_tracker - _semaphore_tracker.ensure_running() - pid = _semaphore_tracker._pid + def test_resource_tracker_reused(self): + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker.ensure_running() + pid = _resource_tracker._pid r, w = multiprocessing.Pipe(duplex=False) - p = multiprocessing.Process(target=self._is_semaphore_tracker_reused, + p = multiprocessing.Process(target=self._is_resource_tracker_reused, args=(w, pid)) p.start() - is_semaphore_tracker_reused = r.recv() + is_resource_tracker_reused = r.recv() # Clean up p.join() w.close() r.close() - self.assertTrue(is_semaphore_tracker_reused) + self.assertTrue(is_resource_tracker_reused) class TestSimpleQueue(unittest.TestCase): diff --git a/PCbuild/lib.pyproj b/PCbuild/lib.pyproj index ffb95c6efcdb7c..7ed71bd819cb5f 100644 --- a/PCbuild/lib.pyproj +++ b/PCbuild/lib.pyproj @@ -678,7 +678,7 @@ - + From 24396f4824251c4094b5a651f13b0ed491f61c4c Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Thu, 9 May 2019 16:47:54 +0200 Subject: [PATCH 02/19] DOC adjust module level introduction commment --- Lib/multiprocessing/resource_tracker.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 480eed1b1bb7b8..698c28fdba3f6b 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -1,3 +1,6 @@ +############################################################################### +# Server process to keep track of unlinked resources (like shared memory +# segments, semaphores etc.) and clean them. # # On Unix we run a server process which keeps track of unlinked # resources. The server ignores SIGINT and SIGTERM and reads from a @@ -5,11 +8,12 @@ # end of the pipe, so we get EOF when all other processes have exited. # Then the server process unlinks any remaining resource names. # -# This is important because the system only supports a limited number -# of named resources, and they will not be automatically removed till -# the next reboot. Without this resource tracker process, "killall -# python" would probably leave unlinked resources. -# +# This is important because there may be system limits for such resources: for +# instance, the system only supports a limited number of named semaphores, and +# shared-memory segments live in the RAM. If a python process leaks such a +# resoucre, this resource will not be removed till the next reboot. Without +# this resource tracker process, "killall python" would probably leave unlinked +# resources. import os import signal From d6c1ae51d940e8e521521a550631a72e2c50ce03 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Thu, 9 May 2019 17:08:06 +0200 Subject: [PATCH 03/19] ENH extend ResourceTracker to arbitrary resources --- Lib/multiprocessing/resource_tracker.py | 76 ++++++++++++++----------- Lib/multiprocessing/synchronize.py | 4 +- 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 698c28fdba3f6b..b17cd1e3ca920e 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -16,6 +16,7 @@ # resources. import os +import shutil import signal import sys import threading @@ -30,6 +31,11 @@ _HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) +_CLEANUP_FUNCS = { + 'folder': shutil.rmtree, + 'semaphore': _multiprocessing.sem_unlink +} + class ResourceTracker(object): @@ -111,23 +117,23 @@ def _check_alive(self): try: # We cannot use send here as it calls ensure_running, creating # a cycle. - os.write(self._fd, b'PROBE:0\n') + os.write(self._fd, b'PROBE:0:folder\n') except OSError: return False else: return True - def register(self, name): + def register(self, name, rtype): '''Register name of resource with resource tracker.''' - self._send('REGISTER', name) + self._send('REGISTER', name, rtype) - def unregister(self, name): + def unregister(self, name, rtype): '''Unregister name of resource with resource tracker.''' - self._send('UNREGISTER', name) + self._send('UNREGISTER', name, rtype) - def _send(self, cmd, name): + def _send(self, cmd, name, rtype): self.ensure_running() - msg = '{0}:{1}\n'.format(cmd, name).encode('ascii') + msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') if len(name) > 512: # posix guarantees that writes to a pipe of less than PIPE_BUF # bytes are atomic, and that PIPE_BUF >= 512 @@ -157,18 +163,24 @@ def main(fd): except Exception: pass - cache = set() + cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()} try: # keep track of registered/unregistered resources with open(fd, 'rb') as f: for line in f: try: - cmd, name = line.strip().split(b':') - if cmd == b'REGISTER': - cache.add(name) - elif cmd == b'UNREGISTER': - cache.remove(name) - elif cmd == b'PROBE': + cmd, name, rtype = line.strip().decode('ascii').split(':') + cleanup_func = _CLEANUP_FUNCS.get(rtype, None) + if cleanup_func is None: + raise ValueError('Cannot register for automatic ' + 'cleanup: unknown resource type {}' + .format(name, rtype)) + + if cmd == 'REGISTER': + cache[rtype].add(name) + elif cmd == 'UNREGISTER': + cache[rtype].remove(name) + elif cmd == 'PROBE': pass else: raise RuntimeError('unrecognized command %r' % cmd) @@ -179,22 +191,22 @@ def main(fd): pass finally: # all processes have terminated; cleanup any remaining resources - if cache: - try: - warnings.warn('resource_tracker: There appear to be %d ' - 'leaked resources to clean up at shutdown' % - len(cache)) - except Exception: - pass - for name in cache: - # For some reason the process which created and registered this - # resource has failed to unregister it. Presumably it has died. - # We therefore unlink it. - try: - name = name.decode('ascii') + for rtype, rtype_cache in cache.items(): + if rtype_cache: try: - _multiprocessing.sem_unlink(name) - except Exception as e: - warnings.warn('resource_tracker: %r: %s' % (name, e)) - finally: - pass + warnings.warn('resource_tracker: There appear to be %d ' + 'leaked %ss to clean up at shutdown' % + (len(rtype_cache), rtype)) + except Exception: + pass + for name in rtype_cache: + # For some reason the process which created and registered this + # resource has failed to unregister it. Presumably it has + # died. We therefore unlink it. + try: + try: + _CLEANUP_FUNCS[rtype](name) + except Exception as e: + warnings.warn('resource_tracker: %s: %r' % (name, e)) + finally: + pass diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 175ad828f02c18..4fcbefc8bbefd3 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -77,7 +77,7 @@ def _after_fork(obj): # disabled. When the object is garbage collected or the # process shuts down we unlink the semaphore name from .resource_tracker import register - register(self._semlock.name) + register(self._semlock.name, "semaphore") util.Finalize(self, SemLock._cleanup, (self._semlock.name,), exitpriority=0) @@ -85,7 +85,7 @@ def _after_fork(obj): def _cleanup(name): from .resource_tracker import unregister sem_unlink(name) - unregister(name) + unregister(name, "semaphore") def _make_methods(self): self.acquire = self._semlock.acquire From c0eeec98569447222eebe0dd7a768e1dcf0d6ce6 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Thu, 9 May 2019 18:05:24 +0200 Subject: [PATCH 04/19] CLN fixup --- Lib/multiprocessing/resource_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index b17cd1e3ca920e..d34855ea0f55db 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -207,6 +207,6 @@ def main(fd): try: _CLEANUP_FUNCS[rtype](name) except Exception as e: - warnings.warn('resource_tracker: %s: %r' % (name, e)) + warnings.warn('resource_tracker: %r: %s' % (name, e)) finally: pass From 7ec1276547b2156f789b4e46ba283ece0fef1fc5 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Thu, 9 May 2019 18:20:33 +0200 Subject: [PATCH 05/19] TST resource_tracker for all resource types --- Lib/test/_test_multiprocessing.py | 89 +++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 27 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 22636f546b4e8b..93b5ae7def7b06 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -41,6 +41,7 @@ import multiprocessing.queues from multiprocessing import util +from multiprocessing import resource_tracker try: from multiprocessing import reduction @@ -88,6 +89,10 @@ def join_process(process): support.join_thread(process, timeout=TIMEOUT) +def _resource_unlink(name, rtype): + resource_tracker._CLEANUP_FUNCS[rtype](name) + + # # Constants # @@ -4811,36 +4816,66 @@ def test_resource_tracker(self): # import subprocess cmd = '''if 1: - import multiprocessing as mp, time, os + import time, os, tempfile + import multiprocessing as mp + from multiprocessing import resource_tracker + mp.set_start_method("spawn") - lock1 = mp.Lock() - lock2 = mp.Lock() - os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n") - os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n") + rand = tempfile._RandomNameSequence() + + + def create_and_register_resource(rtype): + if rtype == "folder": + folder_name = tempfile.mkdtemp() + # tempfile.mkdtemp() does not register the created folder + # automatically. + resource_tracker.register(folder_name, rtype) + return None, folder_name + elif rtype == "semaphore": + # create a Lock using the low-level _multiprocessing to + # separate resource creation from tracking registration. + lock = mp.Lock() + return lock, lock._semlock.name + else: + raise ValueError( + "Resource type {{}} not understood".format(rtype)) + + + resource1, rname1 = create_and_register_resource("{rtype}") + resource2, rname2 = create_and_register_resource("{rtype}") + + os.write({w}, rname1.encode("ascii") + b"\\n") + os.write({w}, rname2.encode("ascii") + b"\\n") + time.sleep(10) ''' - r, w = os.pipe() - p = subprocess.Popen([sys.executable, - '-E', '-c', cmd % (w, w)], - pass_fds=[w], - stderr=subprocess.PIPE) - os.close(w) - with open(r, 'rb', closefd=True) as f: - name1 = f.readline().rstrip().decode('ascii') - name2 = f.readline().rstrip().decode('ascii') - _multiprocessing.sem_unlink(name1) - p.terminate() - p.wait() - time.sleep(2.0) - with self.assertRaises(OSError) as ctx: - _multiprocessing.sem_unlink(name2) - # docs say it should be ENOENT, but OSX seems to give EINVAL - self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) - err = p.stderr.read().decode('utf-8') - p.stderr.close() - expected = 'resource_tracker: There appear to be 2 leaked semaphores' - self.assertRegex(err, expected) - self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) + for rtype in resource_tracker._CLEANUP_FUNCS: + with self.subTest(rtype=rtype): + r, w = os.pipe() + p = subprocess.Popen([sys.executable, + '-E', '-c', cmd.format(w=w, rtype=rtype)], + pass_fds=[w], + stderr=subprocess.PIPE) + os.close(w) + with open(r, 'rb', closefd=True) as f: + name1 = f.readline().rstrip().decode('ascii') + name2 = f.readline().rstrip().decode('ascii') + _resource_unlink(name1, rtype) + p.terminate() + p.wait() + time.sleep(2.0) + with self.assertRaises(OSError) as ctx: + _resource_unlink(name2, rtype) + # docs say it should be ENOENT, but OSX seems to give EINVAL + self.assertIn( + ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) + err = p.stderr.read().decode('utf-8') + p.stderr.close() + expected = ( + 'resource_tracker: There appear to be 2 leaked {}s'.format( + rtype)) + self.assertRegex(err, expected) + self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) def check_resource_tracker_death(self, signum, should_die): # bpo-31310: if the semaphore tracker process has died, it should From fc32d116f247e31e33ff36914be993431efb04e9 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Thu, 9 May 2019 19:14:29 +0200 Subject: [PATCH 06/19] ENH track shared memory segments --- Lib/multiprocessing/resource_tracker.py | 4 +++- Lib/multiprocessing/shared_memory.py | 13 +++++++++++++ Lib/test/_test_multiprocessing.py | 4 ++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index d34855ea0f55db..c44282182ac0c1 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -22,6 +22,7 @@ import threading import warnings import _multiprocessing +import _posixshmem from . import spawn from . import util @@ -33,7 +34,8 @@ _CLEANUP_FUNCS = { 'folder': shutil.rmtree, - 'semaphore': _multiprocessing.sem_unlink + 'semaphore': _multiprocessing.sem_unlink, + 'shared_memory': _posixshmem.shm_unlink } diff --git a/Lib/multiprocessing/shared_memory.py b/Lib/multiprocessing/shared_memory.py index ebc88858762e8c..c85b7d2cac91c3 100644 --- a/Lib/multiprocessing/shared_memory.py +++ b/Lib/multiprocessing/shared_memory.py @@ -15,6 +15,8 @@ import struct import secrets +from . import util + if os.name == "nt": import _winapi _USE_POSIX = False @@ -113,6 +115,11 @@ def __init__(self, name=None, create=False, size=0): self.unlink() raise + from .resource_tracker import register + register(self._name, "shared_memory") + util.Finalize(self, SharedMemory._cleanup, (self._name,), + exitpriority=0) + else: # Windows Named Shared Memory @@ -233,6 +240,12 @@ def unlink(self): if _USE_POSIX and self._name: _posixshmem.shm_unlink(self._name) + @staticmethod + def _cleanup(name): + from .resource_tracker import unregister + _posixshmem.shm_unlink(name) + unregister(name, "shared_memory") + _encoding = "utf8" diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 93b5ae7def7b06..e3f574bcada730 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4819,6 +4819,7 @@ def test_resource_tracker(self): import time, os, tempfile import multiprocessing as mp from multiprocessing import resource_tracker + from multiprocessing.shared_memory import SharedMemory mp.set_start_method("spawn") rand = tempfile._RandomNameSequence() @@ -4836,6 +4837,9 @@ def create_and_register_resource(rtype): # separate resource creation from tracking registration. lock = mp.Lock() return lock, lock._semlock.name + elif rtype == "shared_memory": + sm = SharedMemory(create=True, size=10) + return sm, sm._name else: raise ValueError( "Resource type {{}} not understood".format(rtype)) From add3b919273cb4104f690b1f500827fbc1451ec3 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Thu, 9 May 2019 20:06:51 +0200 Subject: [PATCH 07/19] CLN surround posix-only imports and functions --- Lib/test/_test_multiprocessing.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index e3f574bcada730..04316138e45a6e 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -41,7 +41,6 @@ import multiprocessing.queues from multiprocessing import util -from multiprocessing import resource_tracker try: from multiprocessing import reduction @@ -89,8 +88,11 @@ def join_process(process): support.join_thread(process, timeout=TIMEOUT) -def _resource_unlink(name, rtype): - resource_tracker._CLEANUP_FUNCS[rtype](name) +if os.name == "posix": + from multiprocessing import resource_tracker + + def _resource_unlink(name, rtype): + resource_tracker._CLEANUP_FUNCS[rtype](name) # From d1c7f0e39b235c282e090e9fcb3eb34d7cce39eb Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Thu, 9 May 2019 20:08:27 +0200 Subject: [PATCH 08/19] CLN stale comment --- Lib/test/_test_multiprocessing.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 04316138e45a6e..080162790d9e6a 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4835,8 +4835,6 @@ def create_and_register_resource(rtype): resource_tracker.register(folder_name, rtype) return None, folder_name elif rtype == "semaphore": - # create a Lock using the low-level _multiprocessing to - # separate resource creation from tracking registration. lock = mp.Lock() return lock, lock._semlock.name elif rtype == "shared_memory": From a2a0d5569454bbc5496ca4ca30f478f10f719d39 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" Date: Thu, 9 May 2019 18:12:56 +0000 Subject: [PATCH 09/19] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst diff --git a/Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst b/Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst new file mode 100644 index 00000000000000..5eaf0a032cc3c2 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst @@ -0,0 +1 @@ +The multiprocessing.resource_tracker replaces the multiprocessing.semaphore_tracker module. Other than semaphores, resource_tracker also tracks shared_memory segments. \ No newline at end of file From bb818d2d13793a2c7c561769745f6407fdc180b5 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Fri, 10 May 2019 00:13:13 +0200 Subject: [PATCH 10/19] FIX remove Finalizers for shared_memory --- Lib/multiprocessing/shared_memory.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/Lib/multiprocessing/shared_memory.py b/Lib/multiprocessing/shared_memory.py index c85b7d2cac91c3..fc772869e700cd 100644 --- a/Lib/multiprocessing/shared_memory.py +++ b/Lib/multiprocessing/shared_memory.py @@ -117,8 +117,6 @@ def __init__(self, name=None, create=False, size=0): from .resource_tracker import register register(self._name, "shared_memory") - util.Finalize(self, SharedMemory._cleanup, (self._name,), - exitpriority=0) else: @@ -238,13 +236,9 @@ def unlink(self): called once (and only once) across all processes which have access to the shared memory block.""" if _USE_POSIX and self._name: + from .resource_tracker import unregister _posixshmem.shm_unlink(self._name) - - @staticmethod - def _cleanup(name): - from .resource_tracker import unregister - _posixshmem.shm_unlink(name) - unregister(name, "shared_memory") + unregister(self._name, "shared_memory") _encoding = "utf8" From 52f21c1f8fd83c12507ff8934b36287766fe9d3a Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Fri, 10 May 2019 00:46:05 +0200 Subject: [PATCH 11/19] CLN unused import --- Lib/multiprocessing/shared_memory.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Lib/multiprocessing/shared_memory.py b/Lib/multiprocessing/shared_memory.py index fc772869e700cd..184e36704baaeb 100644 --- a/Lib/multiprocessing/shared_memory.py +++ b/Lib/multiprocessing/shared_memory.py @@ -15,8 +15,6 @@ import struct import secrets -from . import util - if os.name == "nt": import _winapi _USE_POSIX = False From 0837e0965e81d8002d8c616a27ea7df731d41a95 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Fri, 10 May 2019 13:53:43 +0200 Subject: [PATCH 12/19] CLN improve SemaphoreTracker messages --- Lib/multiprocessing/resource_tracker.py | 2 +- Lib/test/_test_multiprocessing.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index c44282182ac0c1..39d114ff92d97b 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -197,7 +197,7 @@ def main(fd): if rtype_cache: try: warnings.warn('resource_tracker: There appear to be %d ' - 'leaked %ss to clean up at shutdown' % + 'leaked %s objects to clean up at shutdown' % (len(rtype_cache), rtype)) except Exception: pass diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 080162790d9e6a..a176ce8865b870 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4875,8 +4875,8 @@ def create_and_register_resource(rtype): ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) err = p.stderr.read().decode('utf-8') p.stderr.close() - expected = ( - 'resource_tracker: There appear to be 2 leaked {}s'.format( + expected = ('resource_tracker: There appear to be 2 leaked {} ' + 'objects'.format( rtype)) self.assertRegex(err, expected) self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) From 1d7ecdb2f925a6b43640ee537205b28796559d94 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Fri, 10 May 2019 18:28:18 +0200 Subject: [PATCH 13/19] CLN typo --- Lib/multiprocessing/resource_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 39d114ff92d97b..8322b7391261e0 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -11,7 +11,7 @@ # This is important because there may be system limits for such resources: for # instance, the system only supports a limited number of named semaphores, and # shared-memory segments live in the RAM. If a python process leaks such a -# resoucre, this resource will not be removed till the next reboot. Without +# resource, this resource will not be removed till the next reboot. Without # this resource tracker process, "killall python" would probably leave unlinked # resources. From 33c865f74b0795706bb98a32c026cb11681687a0 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Fri, 10 May 2019 18:49:26 +0200 Subject: [PATCH 14/19] CLN do not enable folder tracking, cleaner PROBE --- Lib/multiprocessing/resource_tracker.py | 4 ++-- Lib/test/_test_multiprocessing.py | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 8322b7391261e0..81d75fd2fb672b 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -33,7 +33,7 @@ _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) _CLEANUP_FUNCS = { - 'folder': shutil.rmtree, + 'noop': lambda: None, 'semaphore': _multiprocessing.sem_unlink, 'shared_memory': _posixshmem.shm_unlink } @@ -119,7 +119,7 @@ def _check_alive(self): try: # We cannot use send here as it calls ensure_running, creating # a cycle. - os.write(self._fd, b'PROBE:0:folder\n') + os.write(self._fd, b'PROBE:0:noop\n') except OSError: return False else: diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index a176ce8865b870..9e70280ad1385e 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4855,6 +4855,9 @@ def create_and_register_resource(rtype): ''' for rtype in resource_tracker._CLEANUP_FUNCS: with self.subTest(rtype=rtype): + if rtype == "noop": + # Artefact resource type used by the resource_tracker + continue r, w = os.pipe() p = subprocess.Popen([sys.executable, '-E', '-c', cmd.format(w=w, rtype=rtype)], From c211357e11bc1afe8ff90b6b6b0ace23e54f275d Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Fri, 10 May 2019 19:35:16 +0200 Subject: [PATCH 15/19] CLN stale code, use f-strings --- Lib/multiprocessing/resource_tracker.py | 7 +++---- Lib/test/_test_multiprocessing.py | 8 +------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 81d75fd2fb672b..e67e0b213eb948 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -16,7 +16,6 @@ # resources. import os -import shutil import signal import sys import threading @@ -174,9 +173,9 @@ def main(fd): cmd, name, rtype = line.strip().decode('ascii').split(':') cleanup_func = _CLEANUP_FUNCS.get(rtype, None) if cleanup_func is None: - raise ValueError('Cannot register for automatic ' - 'cleanup: unknown resource type {}' - .format(name, rtype)) + raise ValueError( + f'Cannot register {name} for automatic cleanup: ' + f'unknown resource type {rtype}') if cmd == 'REGISTER': cache[rtype].add(name) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 9e70280ad1385e..1af38ab261b15a 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4828,13 +4828,7 @@ def test_resource_tracker(self): def create_and_register_resource(rtype): - if rtype == "folder": - folder_name = tempfile.mkdtemp() - # tempfile.mkdtemp() does not register the created folder - # automatically. - resource_tracker.register(folder_name, rtype) - return None, folder_name - elif rtype == "semaphore": + if rtype == "semaphore": lock = mp.Lock() return lock, lock._semlock.name elif rtype == "shared_memory": From c9cfac5497a2016b584350661c2bffb5403e6289 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Fri, 10 May 2019 15:34:08 +0200 Subject: [PATCH 16/19] TST test end to end shared-memory cleanup --- Lib/test/_test_multiprocessing.py | 112 ++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 1af38ab261b15a..428d35f5519a33 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3879,6 +3879,118 @@ def test_shared_memory_ShareableList_pickling(self): deserialized_sl.shm.close() sl.shm.close() + @unittest.skipUnless(os.name == 'posix', 'posix shared memory') + def test_posix_shared_memory_cleaned_after_process_termination(self): + import subprocess + cmd = '''if 1: + import os, time, sys + from multiprocessing import shared_memory + + sm = shared_memory.SharedMemory(create=True, size=10) + sm._buf[0] = 1 + os.write({w}, sm._name.encode("ascii") + b"\\n") + time.sleep(100) + ''' + cmd2 = '''if 1: + from multiprocessing import shared_memory + import os, time, sys + + sm = shared_memory.SharedMemory("{smm_name}", create=False) + os.write({w}, str(sm._buf[0]).encode("ascii") + b"\\n") + time.sleep(100) + ''' + r, w = os.pipe() + p = subprocess.Popen([sys.executable, '-E', '-c', cmd.format(w=w)], + pass_fds=[w], + stderr=subprocess.PIPE) + + f = open(r, 'rb') + name = f.readline().rstrip().decode('ascii') + + p2 = subprocess.Popen([sys.executable, '-E', '-c', + cmd2.format(smm_name=name, w=w)], + pass_fds=[w], stderr=subprocess.PIPE) + + first_elem = int(f.readline().rstrip().decode('ascii')) + f.close() + os.close(w) + self.assertEqual(first_elem, 1) + + p.terminate() + p2.terminate() + p.wait() + p2.wait() + time.sleep(1.0) + + import _posixshmem + with self.assertRaises(OSError) as ctx: + _posixshmem.shm_unlink(name) + + @unittest.skipUnless(sys.platform == 'win32', 'Windows shared memory') + def test_windows_shared_memory_cleaned_after_process_termination(self): + import subprocess + cmd = '''if 1: + import os, time, sys + import msvcrt, _winapi + from multiprocessing import shared_memory, reduction + + + source_process = _winapi.OpenProcess( + _winapi.PROCESS_DUP_HANDLE, False, {parent_pid}) + w = msvcrt.open_osfhandle(reduction.duplicate( + {w}, source_process=source_process), 0) + + sm = shared_memory.SharedMemory(create=True, size=10) + sm._buf[0] = 1 + os.write(w, sm._name.encode("ascii") + b"\\n") + time.sleep(100) + ''' + cmd2 = '''if 1: + import msvcrt, _winapi + import os, time, sys + from multiprocessing import shared_memory, reduction + + + source_process = _winapi.OpenProcess( + _winapi.PROCESS_DUP_HANDLE, False, {parent_pid}) + w = msvcrt.open_osfhandle(reduction.duplicate( + {w}, source_process=source_process), 0) + + sm = shared_memory.SharedMemory("{smm_name}", create=False) + os.write(w, str(sm._buf[0]).encode("ascii") + b"\\n") + time.sleep(100) + ''' + r, w = os.pipe() + import msvcrt + p = subprocess.Popen([sys.executable, '-E', '-c', + cmd.format(w=msvcrt.get_osfhandle(w), + parent_pid=os.getpid())], + stderr=subprocess.PIPE) + + f = open(r, 'rb') + name = f.readline().rstrip().decode('ascii') + + p2 = subprocess.Popen([sys.executable, '-E', '-c', + cmd2.format(smm_name=name, + w=msvcrt.get_osfhandle(w), + parent_pid=os.getpid())], + stderr=subprocess.PIPE) + + first_elem = int(f.readline().rstrip().decode('ascii')) + f.close() + os.close(w) + self.assertEqual(first_elem, 1) + + p.terminate() + p2.terminate() + p.wait() + p2.wait() + time.sleep(1.0) + with self.assertRaises(FileNotFoundError): + from multiprocessing import shared_memory + smm = shared_memory.SharedMemory(name, create=False) + + # # # From 1c5341abcee3bab31d3f67676d69f9d8e0fe971a Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Fri, 10 May 2019 18:06:44 +0200 Subject: [PATCH 17/19] DOC more comments --- Lib/test/_test_multiprocessing.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 428d35f5519a33..b76b0839f8ead9 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3886,6 +3886,7 @@ def test_posix_shared_memory_cleaned_after_process_termination(self): import os, time, sys from multiprocessing import shared_memory + # Create a shared_memory segment, and send the segment name sm = shared_memory.SharedMemory(create=True, size=10) sm._buf[0] = 1 os.write({w}, sm._name.encode("ascii") + b"\\n") @@ -3895,6 +3896,7 @@ def test_posix_shared_memory_cleaned_after_process_termination(self): from multiprocessing import shared_memory import os, time, sys + # Send back the first byte of sm sm = shared_memory.SharedMemory("{smm_name}", create=False) os.write({w}, str(sm._buf[0]).encode("ascii") + b"\\n") time.sleep(100) @@ -3911,16 +3913,20 @@ def test_posix_shared_memory_cleaned_after_process_termination(self): cmd2.format(smm_name=name, w=w)], pass_fds=[w], stderr=subprocess.PIPE) + # make sure that the shared memory segment is correctly seen among + # different processes first_elem = int(f.readline().rstrip().decode('ascii')) + self.assertEqual(first_elem, 1) f.close() os.close(w) - self.assertEqual(first_elem, 1) + # killing abruptly processes holding reference to a shared memory + # segment should not leak the given memory segment. p.terminate() p2.terminate() p.wait() p2.wait() - time.sleep(1.0) + time.sleep(1.0) # wait for the OS to collect the segment import _posixshmem with self.assertRaises(OSError) as ctx: @@ -3935,6 +3941,7 @@ def test_windows_shared_memory_cleaned_after_process_termination(self): from multiprocessing import shared_memory, reduction + # Create a shared_memory segment, and send the segment name source_process = _winapi.OpenProcess( _winapi.PROCESS_DUP_HANDLE, False, {parent_pid}) w = msvcrt.open_osfhandle(reduction.duplicate( @@ -3956,6 +3963,7 @@ def test_windows_shared_memory_cleaned_after_process_termination(self): w = msvcrt.open_osfhandle(reduction.duplicate( {w}, source_process=source_process), 0) + # Send back the first byte of sm sm = shared_memory.SharedMemory("{smm_name}", create=False) os.write(w, str(sm._buf[0]).encode("ascii") + b"\\n") time.sleep(100) @@ -3976,11 +3984,15 @@ def test_windows_shared_memory_cleaned_after_process_termination(self): parent_pid=os.getpid())], stderr=subprocess.PIPE) + # make sure that the shared memory segment is correctly seen among + # different processes first_elem = int(f.readline().rstrip().decode('ascii')) f.close() os.close(w) self.assertEqual(first_elem, 1) + # killing abruptly processes holding reference to a shared memory + # segment should not leak the given memory segment. p.terminate() p2.terminate() p.wait() From 907ffb9a3a218b8a9582042ab82f446c8f6bb78d Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Fri, 10 May 2019 22:17:47 +0200 Subject: [PATCH 18/19] CLN simplifications --- Lib/test/_test_multiprocessing.py | 110 ++---------------------------- 1 file changed, 6 insertions(+), 104 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index b76b0839f8ead9..4751a00c276253 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3879,130 +3879,32 @@ def test_shared_memory_ShareableList_pickling(self): deserialized_sl.shm.close() sl.shm.close() - @unittest.skipUnless(os.name == 'posix', 'posix shared memory') def test_posix_shared_memory_cleaned_after_process_termination(self): import subprocess + from multiprocessing import shared_memory cmd = '''if 1: import os, time, sys from multiprocessing import shared_memory # Create a shared_memory segment, and send the segment name sm = shared_memory.SharedMemory(create=True, size=10) - sm._buf[0] = 1 - os.write({w}, sm._name.encode("ascii") + b"\\n") + sys.stdout.write(sm._name + '\\n') + sys.stdout.flush() time.sleep(100) ''' - cmd2 = '''if 1: - from multiprocessing import shared_memory - import os, time, sys - - # Send back the first byte of sm - sm = shared_memory.SharedMemory("{smm_name}", create=False) - os.write({w}, str(sm._buf[0]).encode("ascii") + b"\\n") - time.sleep(100) - ''' - r, w = os.pipe() - p = subprocess.Popen([sys.executable, '-E', '-c', cmd.format(w=w)], - pass_fds=[w], - stderr=subprocess.PIPE) - - f = open(r, 'rb') - name = f.readline().rstrip().decode('ascii') - - p2 = subprocess.Popen([sys.executable, '-E', '-c', - cmd2.format(smm_name=name, w=w)], - pass_fds=[w], stderr=subprocess.PIPE) - - # make sure that the shared memory segment is correctly seen among - # different processes - first_elem = int(f.readline().rstrip().decode('ascii')) - self.assertEqual(first_elem, 1) - f.close() - os.close(w) + p = subprocess.Popen([sys.executable, '-E', '-c', cmd], + stdout=subprocess.PIPE) + name = p.stdout.readline().strip().decode() # killing abruptly processes holding reference to a shared memory # segment should not leak the given memory segment. p.terminate() - p2.terminate() p.wait() - p2.wait() time.sleep(1.0) # wait for the OS to collect the segment - import _posixshmem - with self.assertRaises(OSError) as ctx: - _posixshmem.shm_unlink(name) - - @unittest.skipUnless(sys.platform == 'win32', 'Windows shared memory') - def test_windows_shared_memory_cleaned_after_process_termination(self): - import subprocess - cmd = '''if 1: - import os, time, sys - import msvcrt, _winapi - from multiprocessing import shared_memory, reduction - - - # Create a shared_memory segment, and send the segment name - source_process = _winapi.OpenProcess( - _winapi.PROCESS_DUP_HANDLE, False, {parent_pid}) - w = msvcrt.open_osfhandle(reduction.duplicate( - {w}, source_process=source_process), 0) - - sm = shared_memory.SharedMemory(create=True, size=10) - sm._buf[0] = 1 - os.write(w, sm._name.encode("ascii") + b"\\n") - time.sleep(100) - ''' - cmd2 = '''if 1: - import msvcrt, _winapi - import os, time, sys - from multiprocessing import shared_memory, reduction - - - source_process = _winapi.OpenProcess( - _winapi.PROCESS_DUP_HANDLE, False, {parent_pid}) - w = msvcrt.open_osfhandle(reduction.duplicate( - {w}, source_process=source_process), 0) - - # Send back the first byte of sm - sm = shared_memory.SharedMemory("{smm_name}", create=False) - os.write(w, str(sm._buf[0]).encode("ascii") + b"\\n") - time.sleep(100) - ''' - r, w = os.pipe() - import msvcrt - p = subprocess.Popen([sys.executable, '-E', '-c', - cmd.format(w=msvcrt.get_osfhandle(w), - parent_pid=os.getpid())], - stderr=subprocess.PIPE) - - f = open(r, 'rb') - name = f.readline().rstrip().decode('ascii') - - p2 = subprocess.Popen([sys.executable, '-E', '-c', - cmd2.format(smm_name=name, - w=msvcrt.get_osfhandle(w), - parent_pid=os.getpid())], - stderr=subprocess.PIPE) - - # make sure that the shared memory segment is correctly seen among - # different processes - first_elem = int(f.readline().rstrip().decode('ascii')) - f.close() - os.close(w) - self.assertEqual(first_elem, 1) - - # killing abruptly processes holding reference to a shared memory - # segment should not leak the given memory segment. - p.terminate() - p2.terminate() - p.wait() - p2.wait() - time.sleep(1.0) with self.assertRaises(FileNotFoundError): - from multiprocessing import shared_memory smm = shared_memory.SharedMemory(name, create=False) - # # # From a2822a5d1a59340f0a614ccf4481e3b660c572d8 Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Fri, 10 May 2019 22:20:12 +0200 Subject: [PATCH 19/19] CLN rename test --- Lib/test/_test_multiprocessing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 4751a00c276253..6624554558aac8 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3879,7 +3879,7 @@ def test_shared_memory_ShareableList_pickling(self): deserialized_sl.shm.close() sl.shm.close() - def test_posix_shared_memory_cleaned_after_process_termination(self): + def test_shared_memory_cleaned_after_process_termination(self): import subprocess from multiprocessing import shared_memory cmd = '''if 1: