Skip to content

bpo-36867: Make semaphore_tracker track other system resources. #13222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Lib/multiprocessing/forkserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from . import connection
from . import process
from .context import reduction
from . import semaphore_tracker
from . import resource_tracker
from . import spawn
from . import util

Expand Down Expand Up @@ -69,7 +69,7 @@ def connect_to_new_process(self, fds):
parent_r, child_w = os.pipe()
child_r, parent_w = os.pipe()
allfds = [child_r, child_w, self._forkserver_alive_fd,
semaphore_tracker.getfd()]
resource_tracker.getfd()]
allfds += fds
try:
reduction.sendfds(client, allfds)
Expand All @@ -90,7 +90,7 @@ def ensure_running(self):
ensure_running() will do nothing.
'''
with self._lock:
semaphore_tracker.ensure_running()
resource_tracker.ensure_running()
if self._forkserver_pid is not None:
# forkserver was launched before, is it still running?
pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
Expand Down Expand Up @@ -290,7 +290,7 @@ def _serve_one(child_r, fds, unused_fds, handlers):
os.close(fd)

(_forkserver._forkserver_alive_fd,
semaphore_tracker._semaphore_tracker._fd,
resource_tracker._resource_tracker._fd,
*_forkserver._inherited_fds) = fds

# Run process object received over pipe
Expand Down
4 changes: 2 additions & 2 deletions Lib/multiprocessing/popen_spawn_posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ def duplicate_for_child(self, fd):
return fd

def _launch(self, process_obj):
from . import semaphore_tracker
tracker_fd = semaphore_tracker.getfd()
from . import resource_tracker
tracker_fd = resource_tracker.getfd()
self._fds.append(tracker_fd)
prep_data = spawn.get_preparation_data(process_obj._name)
fp = io.BytesIO()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
###############################################################################
# Server process to keep track of unlinked resources (like shared memory
# segments, semaphores etc.) and clean them.
#
# On Unix we run a server process which keeps track of unlinked
# semaphores. The server ignores SIGINT and SIGTERM and reads from a
# resources. The server ignores SIGINT and SIGTERM and reads from a
# pipe. Every other process of the program has a copy of the writable
# end of the pipe, so we get EOF when all other processes have exited.
# Then the server process unlinks any remaining semaphore names.
#
# This is important because the system only supports a limited number
# of named semaphores, and they will not be automatically removed till
# the next reboot. Without this semaphore tracker process, "killall
# python" would probably leave unlinked semaphores.
# Then the server process unlinks any remaining resource names.
#
# This is important because there may be system limits for such resources: for
# instance, the system only supports a limited number of named semaphores, and
# shared-memory segments live in the RAM. If a python process leaks such a
# resource, this resource will not be removed till the next reboot. Without
# this resource tracker process, "killall python" would probably leave unlinked
# resources.

import os
import signal
import sys
import threading
import warnings
import _multiprocessing
import _posixshmem

from . import spawn
from . import util
Expand All @@ -26,8 +31,14 @@
_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)

_CLEANUP_FUNCS = {
'noop': lambda: None,
'semaphore': _multiprocessing.sem_unlink,
'shared_memory': _posixshmem.shm_unlink
}


class SemaphoreTracker(object):
class ResourceTracker(object):

def __init__(self):
self._lock = threading.Lock()
Expand All @@ -39,13 +50,13 @@ def getfd(self):
return self._fd

def ensure_running(self):
'''Make sure that semaphore tracker process is running.
'''Make sure that resource tracker process is running.

This can be run from any process. Usually a child process will use
the semaphore created by its parent.'''
the resource created by its parent.'''
with self._lock:
if self._fd is not None:
# semaphore tracker was launched before, is it still running?
# resource tracker was launched before, is it still running?
if self._check_alive():
# => still alive
return
Expand All @@ -55,24 +66,24 @@ def ensure_running(self):
# Clean-up to avoid dangling processes.
try:
# _pid can be None if this process is a child from another
# python process, which has started the semaphore_tracker.
# python process, which has started the resource_tracker.
if self._pid is not None:
os.waitpid(self._pid, 0)
except ChildProcessError:
# The semaphore_tracker has already been terminated.
# The resource_tracker has already been terminated.
pass
self._fd = None
self._pid = None

warnings.warn('semaphore_tracker: process died unexpectedly, '
'relaunching. Some semaphores might leak.')
warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')

fds_to_pass = []
try:
fds_to_pass.append(sys.stderr.fileno())
except Exception:
pass
cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
r, w = os.pipe()
try:
fds_to_pass.append(r)
Expand Down Expand Up @@ -107,23 +118,23 @@ def _check_alive(self):
try:
# We cannot use send here as it calls ensure_running, creating
# a cycle.
os.write(self._fd, b'PROBE:0\n')
os.write(self._fd, b'PROBE:0:noop\n')
except OSError:
return False
else:
return True

def register(self, name):
'''Register name of semaphore with semaphore tracker.'''
self._send('REGISTER', name)
def register(self, name, rtype):
'''Register name of resource with resource tracker.'''
self._send('REGISTER', name, rtype)

def unregister(self, name):
'''Unregister name of semaphore with semaphore tracker.'''
self._send('UNREGISTER', name)
def unregister(self, name, rtype):
'''Unregister name of resource with resource tracker.'''
self._send('UNREGISTER', name, rtype)

def _send(self, cmd, name):
def _send(self, cmd, name, rtype):
self.ensure_running()
msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
if len(name) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
Expand All @@ -133,14 +144,14 @@ def _send(self, cmd, name):
nbytes, len(msg))


_semaphore_tracker = SemaphoreTracker()
ensure_running = _semaphore_tracker.ensure_running
register = _semaphore_tracker.register
unregister = _semaphore_tracker.unregister
getfd = _semaphore_tracker.getfd
_resource_tracker = ResourceTracker()
ensure_running = _resource_tracker.ensure_running
register = _resource_tracker.register
unregister = _resource_tracker.unregister
getfd = _resource_tracker.getfd

def main(fd):
'''Run semaphore tracker.'''
'''Run resource tracker.'''
# protect the process from ^C and "killall python" etc
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
Expand All @@ -153,18 +164,24 @@ def main(fd):
except Exception:
pass

cache = set()
cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
try:
# keep track of registered/unregistered semaphores
# keep track of registered/unregistered resources
with open(fd, 'rb') as f:
for line in f:
try:
cmd, name = line.strip().split(b':')
if cmd == b'REGISTER':
cache.add(name)
elif cmd == b'UNREGISTER':
cache.remove(name)
elif cmd == b'PROBE':
cmd, name, rtype = line.strip().decode('ascii').split(':')
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
if cleanup_func is None:
raise ValueError(
f'Cannot register {name} for automatic cleanup: '
f'unknown resource type {rtype}')

if cmd == 'REGISTER':
cache[rtype].add(name)
elif cmd == 'UNREGISTER':
cache[rtype].remove(name)
elif cmd == 'PROBE':
pass
else:
raise RuntimeError('unrecognized command %r' % cmd)
Expand All @@ -174,23 +191,23 @@ def main(fd):
except:
pass
finally:
# all processes have terminated; cleanup any remaining semaphores
if cache:
try:
warnings.warn('semaphore_tracker: There appear to be %d '
'leaked semaphores to clean up at shutdown' %
len(cache))
except Exception:
pass
for name in cache:
# For some reason the process which created and registered this
# semaphore has failed to unregister it. Presumably it has died.
# We therefore unlink it.
try:
name = name.decode('ascii')
# all processes have terminated; cleanup any remaining resources
for rtype, rtype_cache in cache.items():
if rtype_cache:
try:
_multiprocessing.sem_unlink(name)
except Exception as e:
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
finally:
pass
warnings.warn('resource_tracker: There appear to be %d '
'leaked %s objects to clean up at shutdown' %
(len(rtype_cache), rtype))
except Exception:
pass
for name in rtype_cache:
# For some reason the process which created and registered this
# resource has failed to unregister it. Presumably it has
# died. We therefore unlink it.
try:
try:
_CLEANUP_FUNCS[rtype](name)
except Exception as e:
warnings.warn('resource_tracker: %r: %s' % (name, e))
finally:
pass
5 changes: 5 additions & 0 deletions Lib/multiprocessing/shared_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ def __init__(self, name=None, create=False, size=0):
self.unlink()
raise

from .resource_tracker import register
register(self._name, "shared_memory")

else:

# Windows Named Shared Memory
Expand Down Expand Up @@ -231,7 +234,9 @@ def unlink(self):
called once (and only once) across all processes which have access
to the shared memory block."""
if _USE_POSIX and self._name:
from .resource_tracker import unregister
_posixshmem.shm_unlink(self._name)
unregister(self._name, "shared_memory")


_encoding = "utf8"
Expand Down
4 changes: 2 additions & 2 deletions Lib/multiprocessing/spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
_winapi.CloseHandle(source_process)
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
else:
from . import semaphore_tracker
semaphore_tracker._semaphore_tracker._fd = tracker_fd
from . import resource_tracker
resource_tracker._resource_tracker._fd = tracker_fd
fd = pipe_handle
exitcode = _main(fd)
sys.exit(exitcode)
Expand Down
8 changes: 4 additions & 4 deletions Lib/multiprocessing/synchronize.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ def _after_fork(obj):
# We only get here if we are on Unix with forking
# disabled. When the object is garbage collected or the
# process shuts down we unlink the semaphore name
from .semaphore_tracker import register
register(self._semlock.name)
from .resource_tracker import register
register(self._semlock.name, "semaphore")
util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
exitpriority=0)

@staticmethod
def _cleanup(name):
from .semaphore_tracker import unregister
from .resource_tracker import unregister
sem_unlink(name)
unregister(name)
unregister(name, "semaphore")

def _make_methods(self):
self.acquire = self._semlock.acquire
Expand Down
Loading