Skip to content

Commit 4a7dd30

Browse files
tzickelpitrou
authored andcommitted
[2.7] bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-9686)
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.
1 parent 8d3b0f4 commit 4a7dd30

File tree

3 files changed

+45
-19
lines changed

3 files changed

+45
-19
lines changed

Lib/multiprocessing/pool.py

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,9 @@ def __init__(self, processes=None, initializer=None, initargs=(),
162162

163163
self._worker_handler = threading.Thread(
164164
target=Pool._handle_workers,
165-
args=(self, )
165+
args=(self._cache, self._processes, self._pool, self.Process,
166+
self._inqueue, self._outqueue, self._initializer,
167+
self._initargs, self._maxtasksperchild, self._taskqueue)
166168
)
167169
self._worker_handler.daemon = True
168170
self._worker_handler._state = RUN
@@ -194,42 +196,56 @@ def __init__(self, processes=None, initializer=None, initargs=(),
194196
exitpriority=15
195197
)
196198

197-
def _join_exited_workers(self):
199+
@staticmethod
200+
def _join_exited_workers(pool):
198201
"""Cleanup after any worker processes which have exited due to reaching
199202
their specified lifetime. Returns True if any workers were cleaned up.
200203
"""
201204
cleaned = False
202-
for i in reversed(range(len(self._pool))):
203-
worker = self._pool[i]
205+
for i in reversed(range(len(pool))):
206+
worker = pool[i]
204207
if worker.exitcode is not None:
205208
# worker exited
206209
debug('cleaning up worker %d' % i)
207210
worker.join()
208211
cleaned = True
209-
del self._pool[i]
212+
del pool[i]
210213
return cleaned
211214

212215
def _repopulate_pool(self):
216+
return self._repopulate_pool_static(self._processes, self._pool,
217+
self.Process, self._inqueue,
218+
self._outqueue, self._initializer,
219+
self._initargs,
220+
self._maxtasksperchild)
221+
222+
@staticmethod
223+
def _repopulate_pool_static(processes, pool, Process, inqueue, outqueue,
224+
initializer, initargs, maxtasksperchild):
213225
"""Bring the number of pool processes up to the specified number,
214226
for use after reaping workers which have exited.
215227
"""
216-
for i in range(self._processes - len(self._pool)):
217-
w = self.Process(target=worker,
218-
args=(self._inqueue, self._outqueue,
219-
self._initializer,
220-
self._initargs, self._maxtasksperchild)
221-
)
222-
self._pool.append(w)
228+
for i in range(processes - len(pool)):
229+
w = Process(target=worker,
230+
args=(inqueue, outqueue,
231+
initializer,
232+
initargs, maxtasksperchild)
233+
)
234+
pool.append(w)
223235
w.name = w.name.replace('Process', 'PoolWorker')
224236
w.daemon = True
225237
w.start()
226238
debug('added worker')
227239

228-
def _maintain_pool(self):
240+
@staticmethod
241+
def _maintain_pool(processes, pool, Process, inqueue, outqueue,
242+
initializer, initargs, maxtasksperchild):
229243
"""Clean up any exited workers and start replacements for them.
230244
"""
231-
if self._join_exited_workers():
232-
self._repopulate_pool()
245+
if Pool._join_exited_workers(pool):
246+
Pool._repopulate_pool_static(processes, pool, Process, inqueue,
247+
outqueue, initializer, initargs,
248+
maxtasksperchild)
233249

234250
def _setup_queues(self):
235251
from .queues import SimpleQueue
@@ -319,16 +335,18 @@ def map_async(self, func, iterable, chunksize=None, callback=None):
319335
return result
320336

321337
@staticmethod
322-
def _handle_workers(pool):
338+
def _handle_workers(cache, processes, pool, Process, inqueue, outqueue,
339+
initializer, initargs, maxtasksperchild, taskqueue):
323340
thread = threading.current_thread()
324341

325342
# Keep maintaining workers until the cache gets drained, unless the pool
326343
# is terminated.
327-
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
328-
pool._maintain_pool()
344+
while thread._state == RUN or (cache and thread._state != TERMINATE):
345+
Pool._maintain_pool(processes, pool, Process, inqueue, outqueue,
346+
initializer, initargs, maxtasksperchild)
329347
time.sleep(0.1)
330348
# send sentinel to stop workers
331-
pool._taskqueue.put(None)
349+
taskqueue.put(None)
332350
debug('worker handler exiting')
333351

334352
@staticmethod

Lib/test/test_multiprocessing.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1359,6 +1359,13 @@ def test_release_task_refs(self):
13591359
# they were released too.
13601360
self.assertEqual(CountedObject.n_instances, 0)
13611361

1362+
def test_del_pool(self):
1363+
p = self.Pool(1)
1364+
wr = weakref.ref(p)
1365+
del p
1366+
gc.collect()
1367+
self.assertIsNone(wr())
1368+
13621369

13631370
def unpickleable_result():
13641371
return lambda: 42
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.

0 commit comments

Comments
 (0)