Skip to content

[3.7] bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450) #9676

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 50 additions & 24 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ class Pool(object):
'''
_wrap_exception = True

def Process(self, *args, **kwds):
return self._ctx.Process(*args, **kwds)
@staticmethod
def Process(ctx, *args, **kwds):
return ctx.Process(*args, **kwds)

def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
Expand All @@ -177,13 +178,15 @@ def __init__(self, processes=None, initializer=None, initargs=(),

self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self, )
args=(self._cache, self._taskqueue, self._ctx, self.Process,
self._processes, self._pool, self._inqueue, self._outqueue,
self._initializer, self._initargs, self._maxtasksperchild,
self._wrap_exception)
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()


self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue,
Expand All @@ -209,43 +212,62 @@ def __init__(self, processes=None, initializer=None, initargs=(),
exitpriority=15
)

def _join_exited_workers(self):
@staticmethod
def _join_exited_workers(pool):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
"""
cleaned = False
for i in reversed(range(len(self._pool))):
worker = self._pool[i]
for i in reversed(range(len(pool))):
worker = pool[i]
if worker.exitcode is not None:
# worker exited
util.debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
del self._pool[i]
del pool[i]
return cleaned

def _repopulate_pool(self):
return self._repopulate_pool_static(self._ctx, self.Process,
self._processes,
self._pool, self._inqueue,
self._outqueue, self._initializer,
self._initargs,
self._maxtasksperchild,
self._wrap_exception)

@staticmethod
def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
for i in range(self._processes - len(self._pool)):
w = self.Process(target=worker,
args=(self._inqueue, self._outqueue,
self._initializer,
self._initargs, self._maxtasksperchild,
self._wrap_exception)
)
self._pool.append(w)
for i in range(processes - len(pool)):
w = Process(ctx, target=worker,
args=(inqueue, outqueue,
initializer,
initargs, maxtasksperchild,
wrap_exception)
)
pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
util.debug('added worker')

def _maintain_pool(self):
@staticmethod
def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
initializer, initargs, maxtasksperchild,
wrap_exception):
"""Clean up any exited workers and start replacements for them.
"""
if self._join_exited_workers():
self._repopulate_pool()
if Pool._join_exited_workers(pool):
Pool._repopulate_pool_static(ctx, Process, processes, pool,
inqueue, outqueue, initializer,
initargs, maxtasksperchild,
wrap_exception)

def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue()
Expand Down Expand Up @@ -403,16 +425,20 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
return result

@staticmethod
def _handle_workers(pool):
def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
inqueue, outqueue, initializer, initargs,
maxtasksperchild, wrap_exception):
thread = threading.current_thread()

# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
pool._maintain_pool()
while thread._state == RUN or (cache and thread._state != TERMINATE):
Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception)
time.sleep(0.1)
# send sentinel to stop workers
pool._taskqueue.put(None)
taskqueue.put(None)
util.debug('worker handler exiting')

@staticmethod
Expand Down Expand Up @@ -794,7 +820,7 @@ class ThreadPool(Pool):
_wrap_exception = False

@staticmethod
def Process(*args, **kwds):
def Process(ctx, *args, **kwds):
from .dummy import Process
return Process(*args, **kwds)

Expand Down
6 changes: 6 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2551,6 +2551,12 @@ def test_release_task_refs(self):
# they were released too.
self.assertEqual(CountedObject.n_instances, 0)

def test_del_pool(self):
p = self.Pool(1)
wr = weakref.ref(p)
del p
gc.collect()
self.assertIsNone(wr())

def raising():
raise KeyError("key")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.