Skip to content

Commit c286945

Browse files
committed
bpo-34172: multiprocessing.Pool leaks resources after being deleted
Due to a circular reference inside the Pool implementation, there was a resource leak if the object was not closed/terminated. This patch removes the circular reference, and now the code acts like the documentation.
1 parent c206f0d commit c286945

File tree

3 files changed

+59
-25
lines changed

3 files changed

+59
-25
lines changed

Lib/multiprocessing/pool.py

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,9 @@ class Pool(object):
149149
'''
150150
_wrap_exception = True
151151

152-
def Process(self, *args, **kwds):
153-
return self._ctx.Process(*args, **kwds)
152+
@staticmethod
153+
def Process(ctx, *args, **kwds):
154+
return ctx.Process(*args, **kwds)
154155

155156
def __init__(self, processes=None, initializer=None, initargs=(),
156157
maxtasksperchild=None, context=None):
@@ -177,13 +178,15 @@ def __init__(self, processes=None, initializer=None, initargs=(),
177178

178179
self._worker_handler = threading.Thread(
179180
target=Pool._handle_workers,
180-
args=(self, )
181+
args=(self._cache, self._taskqueue, self._ctx, self.Process,
182+
self._processes, self._pool, self._inqueue, self._outqueue,
183+
self._initializer, self._initargs, self._maxtasksperchild,
184+
self._wrap_exception)
181185
)
182186
self._worker_handler.daemon = True
183187
self._worker_handler._state = RUN
184188
self._worker_handler.start()
185189

186-
187190
self._task_handler = threading.Thread(
188191
target=Pool._handle_tasks,
189192
args=(self._taskqueue, self._quick_put, self._outqueue,
@@ -209,43 +212,61 @@ def __init__(self, processes=None, initializer=None, initargs=(),
209212
exitpriority=15
210213
)
211214

212-
def _join_exited_workers(self):
215+
@staticmethod
216+
def _join_exited_workers(pool):
213217
"""Cleanup after any worker processes which have exited due to reaching
214218
their specified lifetime. Returns True if any workers were cleaned up.
215219
"""
216220
cleaned = False
217-
for i in reversed(range(len(self._pool))):
218-
worker = self._pool[i]
221+
for i in reversed(range(len(pool))):
222+
worker = pool[i]
219223
if worker.exitcode is not None:
220224
# worker exited
221225
util.debug('cleaning up worker %d' % i)
222226
worker.join()
223227
cleaned = True
224-
del self._pool[i]
228+
del pool[i]
225229
return cleaned
226230

227231
def _repopulate_pool(self):
232+
return self._repopulate_pool_static(self._ctx, self.Process,
233+
self._processes,
234+
self._pool, self._inqueue,
235+
self._outqueue, self._initializer,
236+
self._initargs,
237+
self._maxtasksperchild,
238+
self._wrap_exception)
239+
240+
@staticmethod
241+
def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
242+
outqueue, initializer, initargs,
243+
maxtasksperchild, wrap_exception):
228244
"""Bring the number of pool processes up to the specified number,
229245
for use after reaping workers which have exited.
230246
"""
231-
for i in range(self._processes - len(self._pool)):
232-
w = self.Process(target=worker,
233-
args=(self._inqueue, self._outqueue,
234-
self._initializer,
235-
self._initargs, self._maxtasksperchild,
236-
self._wrap_exception)
237-
)
238-
self._pool.append(w)
247+
for i in range(processes - len(pool)):
248+
w = Process(ctx, target=worker,
249+
args=(inqueue, outqueue,
250+
initializer,
251+
initargs, maxtasksperchild,
252+
wrap_exception)
253+
)
254+
pool.append(w)
239255
w.name = w.name.replace('Process', 'PoolWorker')
240256
w.daemon = True
241257
w.start()
242258
util.debug('added worker')
243259

244-
def _maintain_pool(self):
260+
@classmethod
261+
def _maintain_pool(cls, ctx, Process, processes, pool, inqueue, outqueue,
262+
initializer, initargs, maxtasksperchild,
263+
wrap_exception):
245264
"""Clean up any exited workers and start replacements for them.
246265
"""
247-
if self._join_exited_workers():
248-
self._repopulate_pool()
266+
if cls._join_exited_workers(pool):
267+
cls._repopulate_pool_static(ctx, Process, processes, pool, inqueue,
268+
outqueue, initializer, initargs,
269+
maxtasksperchild, wrap_exception)
249270

250271
def _setup_queues(self):
251272
self._inqueue = self._ctx.SimpleQueue()
@@ -402,17 +423,21 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
402423
)
403424
return result
404425

405-
@staticmethod
406-
def _handle_workers(pool):
426+
@classmethod
427+
def _handle_workers(cls, cache, taskqueue, ctx, Process, processes, pool,
428+
inqueue, outqueue, initializer, initargs,
429+
maxtasksperchild, wrap_exception):
407430
thread = threading.current_thread()
408431

409432
# Keep maintaining workers until the cache gets drained, unless the pool
410433
# is terminated.
411-
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
412-
pool._maintain_pool()
434+
while thread._state == RUN or (cache and thread._state != TERMINATE):
435+
cls._maintain_pool(ctx, Process, processes, pool, inqueue,
436+
outqueue, initializer, initargs,
437+
maxtasksperchild, wrap_exception)
413438
time.sleep(0.1)
414439
# send sentinel to stop workers
415-
pool._taskqueue.put(None)
440+
taskqueue.put(None)
416441
util.debug('worker handler exiting')
417442

418443
@staticmethod
@@ -794,7 +819,7 @@ class ThreadPool(Pool):
794819
_wrap_exception = False
795820

796821
@staticmethod
797-
def Process(*args, **kwds):
822+
def Process(ctx, *args, **kwds):
798823
from .dummy import Process
799824
return Process(*args, **kwds)
800825

Lib/test/_test_multiprocessing.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2527,6 +2527,12 @@ def test_release_task_refs(self):
25272527
# they were released too.
25282528
self.assertEqual(CountedObject.n_instances, 0)
25292529

2530+
def test_del_pool(self):
2531+
p = self.Pool(1)
2532+
wr = weakref.ref(p)
2533+
del p
2534+
gc.collect()
2535+
self.assertIsNone(wr())
25302536

25312537
def raising():
25322538
raise KeyError("key")
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed a circular reference issue inside the multiprocessing.Pool
2+
implementation that caused a resource leak when not closing/terminating the
3+
pool before deleting it.

0 commit comments

Comments
 (0)