Skip to content

Commit f22cc69

Browse files
pierreglaserpitrou
authored andcommitted
bpo-36867: Make semaphore_tracker track other system resources (GH-13222)
The multiprocessing.resource_tracker replaces the multiprocessing.semaphore_tracker module. Other than semaphores, resource_tracker also tracks shared_memory segments. Patch by Pierre Glaser.
1 parent d0d64ad commit f22cc69

File tree

9 files changed

+210
-125
lines changed

9 files changed

+210
-125
lines changed

Lib/multiprocessing/forkserver.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from . import connection
1212
from . import process
1313
from .context import reduction
14-
from . import semaphore_tracker
14+
from . import resource_tracker
1515
from . import spawn
1616
from . import util
1717

@@ -69,7 +69,7 @@ def connect_to_new_process(self, fds):
6969
parent_r, child_w = os.pipe()
7070
child_r, parent_w = os.pipe()
7171
allfds = [child_r, child_w, self._forkserver_alive_fd,
72-
semaphore_tracker.getfd()]
72+
resource_tracker.getfd()]
7373
allfds += fds
7474
try:
7575
reduction.sendfds(client, allfds)
@@ -90,7 +90,7 @@ def ensure_running(self):
9090
ensure_running() will do nothing.
9191
'''
9292
with self._lock:
93-
semaphore_tracker.ensure_running()
93+
resource_tracker.ensure_running()
9494
if self._forkserver_pid is not None:
9595
# forkserver was launched before, is it still running?
9696
pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
@@ -290,7 +290,7 @@ def _serve_one(child_r, fds, unused_fds, handlers):
290290
os.close(fd)
291291

292292
(_forkserver._forkserver_alive_fd,
293-
semaphore_tracker._semaphore_tracker._fd,
293+
resource_tracker._resource_tracker._fd,
294294
*_forkserver._inherited_fds) = fds
295295

296296
# Run process object received over pipe

Lib/multiprocessing/popen_spawn_posix.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ def duplicate_for_child(self, fd):
3636
return fd
3737

3838
def _launch(self, process_obj):
39-
from . import semaphore_tracker
40-
tracker_fd = semaphore_tracker.getfd()
39+
from . import resource_tracker
40+
tracker_fd = resource_tracker.getfd()
4141
self._fds.append(tracker_fd)
4242
prep_data = spawn.get_preparation_data(process_obj._name)
4343
fp = io.BytesIO()
Lines changed: 75 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,27 @@
1+
###############################################################################
2+
# Server process to keep track of unlinked resources (like shared memory
3+
# segments, semaphores etc.) and clean them.
14
#
25
# On Unix we run a server process which keeps track of unlinked
3-
# semaphores. The server ignores SIGINT and SIGTERM and reads from a
6+
# resources. The server ignores SIGINT and SIGTERM and reads from a
47
# pipe. Every other process of the program has a copy of the writable
58
# end of the pipe, so we get EOF when all other processes have exited.
6-
# Then the server process unlinks any remaining semaphore names.
7-
#
8-
# This is important because the system only supports a limited number
9-
# of named semaphores, and they will not be automatically removed till
10-
# the next reboot. Without this semaphore tracker process, "killall
11-
# python" would probably leave unlinked semaphores.
9+
# Then the server process unlinks any remaining resource names.
1210
#
11+
# This is important because there may be system limits for such resources: for
12+
# instance, the system only supports a limited number of named semaphores, and
13+
# shared-memory segments live in the RAM. If a python process leaks such a
14+
# resource, this resource will not be removed till the next reboot. Without
15+
# this resource tracker process, "killall python" would probably leave unlinked
16+
# resources.
1317

1418
import os
1519
import signal
1620
import sys
1721
import threading
1822
import warnings
1923
import _multiprocessing
24+
import _posixshmem
2025

2126
from . import spawn
2227
from . import util
@@ -26,8 +31,14 @@
2631
_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
2732
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
2833

34+
_CLEANUP_FUNCS = {
35+
'noop': lambda: None,
36+
'semaphore': _multiprocessing.sem_unlink,
37+
'shared_memory': _posixshmem.shm_unlink
38+
}
39+
2940

30-
class SemaphoreTracker(object):
41+
class ResourceTracker(object):
3142

3243
def __init__(self):
3344
self._lock = threading.Lock()
@@ -39,13 +50,13 @@ def getfd(self):
3950
return self._fd
4051

4152
def ensure_running(self):
42-
'''Make sure that semaphore tracker process is running.
53+
'''Make sure that resource tracker process is running.
4354
4455
This can be run from any process. Usually a child process will use
45-
the semaphore created by its parent.'''
56+
the resource created by its parent.'''
4657
with self._lock:
4758
if self._fd is not None:
48-
# semaphore tracker was launched before, is it still running?
59+
# resource tracker was launched before, is it still running?
4960
if self._check_alive():
5061
# => still alive
5162
return
@@ -55,24 +66,24 @@ def ensure_running(self):
5566
# Clean-up to avoid dangling processes.
5667
try:
5768
# _pid can be None if this process is a child from another
58-
# python process, which has started the semaphore_tracker.
69+
# python process, which has started the resource_tracker.
5970
if self._pid is not None:
6071
os.waitpid(self._pid, 0)
6172
except ChildProcessError:
62-
# The semaphore_tracker has already been terminated.
73+
# The resource_tracker has already been terminated.
6374
pass
6475
self._fd = None
6576
self._pid = None
6677

67-
warnings.warn('semaphore_tracker: process died unexpectedly, '
68-
'relaunching. Some semaphores might leak.')
78+
warnings.warn('resource_tracker: process died unexpectedly, '
79+
'relaunching. Some resources might leak.')
6980

7081
fds_to_pass = []
7182
try:
7283
fds_to_pass.append(sys.stderr.fileno())
7384
except Exception:
7485
pass
75-
cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
86+
cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
7687
r, w = os.pipe()
7788
try:
7889
fds_to_pass.append(r)
@@ -107,23 +118,23 @@ def _check_alive(self):
107118
try:
108119
# We cannot use send here as it calls ensure_running, creating
109120
# a cycle.
110-
os.write(self._fd, b'PROBE:0\n')
121+
os.write(self._fd, b'PROBE:0:noop\n')
111122
except OSError:
112123
return False
113124
else:
114125
return True
115126

116-
def register(self, name):
117-
'''Register name of semaphore with semaphore tracker.'''
118-
self._send('REGISTER', name)
127+
def register(self, name, rtype):
128+
'''Register name of resource with resource tracker.'''
129+
self._send('REGISTER', name, rtype)
119130

120-
def unregister(self, name):
121-
'''Unregister name of semaphore with semaphore tracker.'''
122-
self._send('UNREGISTER', name)
131+
def unregister(self, name, rtype):
132+
'''Unregister name of resource with resource tracker.'''
133+
self._send('UNREGISTER', name, rtype)
123134

124-
def _send(self, cmd, name):
135+
def _send(self, cmd, name, rtype):
125136
self.ensure_running()
126-
msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
137+
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
127138
if len(name) > 512:
128139
# posix guarantees that writes to a pipe of less than PIPE_BUF
129140
# bytes are atomic, and that PIPE_BUF >= 512
@@ -133,14 +144,14 @@ def _send(self, cmd, name):
133144
nbytes, len(msg))
134145

135146

136-
_semaphore_tracker = SemaphoreTracker()
137-
ensure_running = _semaphore_tracker.ensure_running
138-
register = _semaphore_tracker.register
139-
unregister = _semaphore_tracker.unregister
140-
getfd = _semaphore_tracker.getfd
147+
_resource_tracker = ResourceTracker()
148+
ensure_running = _resource_tracker.ensure_running
149+
register = _resource_tracker.register
150+
unregister = _resource_tracker.unregister
151+
getfd = _resource_tracker.getfd
141152

142153
def main(fd):
143-
'''Run semaphore tracker.'''
154+
'''Run resource tracker.'''
144155
# protect the process from ^C and "killall python" etc
145156
signal.signal(signal.SIGINT, signal.SIG_IGN)
146157
signal.signal(signal.SIGTERM, signal.SIG_IGN)
@@ -153,18 +164,24 @@ def main(fd):
153164
except Exception:
154165
pass
155166

156-
cache = set()
167+
cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
157168
try:
158-
# keep track of registered/unregistered semaphores
169+
# keep track of registered/unregistered resources
159170
with open(fd, 'rb') as f:
160171
for line in f:
161172
try:
162-
cmd, name = line.strip().split(b':')
163-
if cmd == b'REGISTER':
164-
cache.add(name)
165-
elif cmd == b'UNREGISTER':
166-
cache.remove(name)
167-
elif cmd == b'PROBE':
173+
cmd, name, rtype = line.strip().decode('ascii').split(':')
174+
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
175+
if cleanup_func is None:
176+
raise ValueError(
177+
f'Cannot register {name} for automatic cleanup: '
178+
f'unknown resource type {rtype}')
179+
180+
if cmd == 'REGISTER':
181+
cache[rtype].add(name)
182+
elif cmd == 'UNREGISTER':
183+
cache[rtype].remove(name)
184+
elif cmd == 'PROBE':
168185
pass
169186
else:
170187
raise RuntimeError('unrecognized command %r' % cmd)
@@ -174,23 +191,23 @@ def main(fd):
174191
except:
175192
pass
176193
finally:
177-
# all processes have terminated; cleanup any remaining semaphores
178-
if cache:
179-
try:
180-
warnings.warn('semaphore_tracker: There appear to be %d '
181-
'leaked semaphores to clean up at shutdown' %
182-
len(cache))
183-
except Exception:
184-
pass
185-
for name in cache:
186-
# For some reason the process which created and registered this
187-
# semaphore has failed to unregister it. Presumably it has died.
188-
# We therefore unlink it.
189-
try:
190-
name = name.decode('ascii')
194+
# all processes have terminated; cleanup any remaining resources
195+
for rtype, rtype_cache in cache.items():
196+
if rtype_cache:
191197
try:
192-
_multiprocessing.sem_unlink(name)
193-
except Exception as e:
194-
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
195-
finally:
196-
pass
198+
warnings.warn('resource_tracker: There appear to be %d '
199+
'leaked %s objects to clean up at shutdown' %
200+
(len(rtype_cache), rtype))
201+
except Exception:
202+
pass
203+
for name in rtype_cache:
204+
# For some reason the process which created and registered this
205+
# resource has failed to unregister it. Presumably it has
206+
# died. We therefore unlink it.
207+
try:
208+
try:
209+
_CLEANUP_FUNCS[rtype](name)
210+
except Exception as e:
211+
warnings.warn('resource_tracker: %r: %s' % (name, e))
212+
finally:
213+
pass

Lib/multiprocessing/shared_memory.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ def __init__(self, name=None, create=False, size=0):
113113
self.unlink()
114114
raise
115115

116+
from .resource_tracker import register
117+
register(self._name, "shared_memory")
118+
116119
else:
117120

118121
# Windows Named Shared Memory
@@ -231,7 +234,9 @@ def unlink(self):
231234
called once (and only once) across all processes which have access
232235
to the shared memory block."""
233236
if _USE_POSIX and self._name:
237+
from .resource_tracker import unregister
234238
_posixshmem.shm_unlink(self._name)
239+
unregister(self._name, "shared_memory")
235240

236241

237242
_encoding = "utf8"

Lib/multiprocessing/spawn.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
111111
_winapi.CloseHandle(source_process)
112112
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
113113
else:
114-
from . import semaphore_tracker
115-
semaphore_tracker._semaphore_tracker._fd = tracker_fd
114+
from . import resource_tracker
115+
resource_tracker._resource_tracker._fd = tracker_fd
116116
fd = pipe_handle
117117
exitcode = _main(fd)
118118
sys.exit(exitcode)

Lib/multiprocessing/synchronize.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,16 +76,16 @@ def _after_fork(obj):
7676
# We only get here if we are on Unix with forking
7777
# disabled. When the object is garbage collected or the
7878
# process shuts down we unlink the semaphore name
79-
from .semaphore_tracker import register
80-
register(self._semlock.name)
79+
from .resource_tracker import register
80+
register(self._semlock.name, "semaphore")
8181
util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
8282
exitpriority=0)
8383

8484
@staticmethod
8585
def _cleanup(name):
86-
from .semaphore_tracker import unregister
86+
from .resource_tracker import unregister
8787
sem_unlink(name)
88-
unregister(name)
88+
unregister(name, "semaphore")
8989

9090
def _make_methods(self):
9191
self.acquire = self._semlock.acquire

0 commit comments

Comments
 (0)