Skip to content

Commit eb38ee0

Browse files
authored
[3.6] Revert "bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450) (GH-9677)" (GH-10969)
This reverts commit 07b96a9.
1 parent f7fe18a commit eb38ee0

File tree

3 files changed

+27
-57
lines changed

3 files changed

+27
-57
lines changed

Lib/multiprocessing/pool.py

Lines changed: 24 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,8 @@ class Pool(object):
147147
'''
148148
_wrap_exception = True
149149

150-
@staticmethod
151-
def Process(ctx, *args, **kwds):
152-
return ctx.Process(*args, **kwds)
150+
def Process(self, *args, **kwds):
151+
return self._ctx.Process(*args, **kwds)
153152

154153
def __init__(self, processes=None, initializer=None, initargs=(),
155154
maxtasksperchild=None, context=None):
@@ -176,15 +175,13 @@ def __init__(self, processes=None, initializer=None, initargs=(),
176175

177176
self._worker_handler = threading.Thread(
178177
target=Pool._handle_workers,
179-
args=(self._cache, self._taskqueue, self._ctx, self.Process,
180-
self._processes, self._pool, self._inqueue, self._outqueue,
181-
self._initializer, self._initargs, self._maxtasksperchild,
182-
self._wrap_exception)
178+
args=(self, )
183179
)
184180
self._worker_handler.daemon = True
185181
self._worker_handler._state = RUN
186182
self._worker_handler.start()
187183

184+
188185
self._task_handler = threading.Thread(
189186
target=Pool._handle_tasks,
190187
args=(self._taskqueue, self._quick_put, self._outqueue,
@@ -210,62 +207,43 @@ def __init__(self, processes=None, initializer=None, initargs=(),
210207
exitpriority=15
211208
)
212209

213-
@staticmethod
214-
def _join_exited_workers(pool):
210+
def _join_exited_workers(self):
215211
"""Cleanup after any worker processes which have exited due to reaching
216212
their specified lifetime. Returns True if any workers were cleaned up.
217213
"""
218214
cleaned = False
219-
for i in reversed(range(len(pool))):
220-
worker = pool[i]
215+
for i in reversed(range(len(self._pool))):
216+
worker = self._pool[i]
221217
if worker.exitcode is not None:
222218
# worker exited
223219
util.debug('cleaning up worker %d' % i)
224220
worker.join()
225221
cleaned = True
226-
del pool[i]
222+
del self._pool[i]
227223
return cleaned
228224

229225
def _repopulate_pool(self):
230-
return self._repopulate_pool_static(self._ctx, self.Process,
231-
self._processes,
232-
self._pool, self._inqueue,
233-
self._outqueue, self._initializer,
234-
self._initargs,
235-
self._maxtasksperchild,
236-
self._wrap_exception)
237-
238-
@staticmethod
239-
def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
240-
outqueue, initializer, initargs,
241-
maxtasksperchild, wrap_exception):
242226
"""Bring the number of pool processes up to the specified number,
243227
for use after reaping workers which have exited.
244228
"""
245-
for i in range(processes - len(pool)):
246-
w = Process(ctx, target=worker,
247-
args=(inqueue, outqueue,
248-
initializer,
249-
initargs, maxtasksperchild,
250-
wrap_exception)
251-
)
252-
pool.append(w)
229+
for i in range(self._processes - len(self._pool)):
230+
w = self.Process(target=worker,
231+
args=(self._inqueue, self._outqueue,
232+
self._initializer,
233+
self._initargs, self._maxtasksperchild,
234+
self._wrap_exception)
235+
)
236+
self._pool.append(w)
253237
w.name = w.name.replace('Process', 'PoolWorker')
254238
w.daemon = True
255239
w.start()
256240
util.debug('added worker')
257241

258-
@staticmethod
259-
def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
260-
initializer, initargs, maxtasksperchild,
261-
wrap_exception):
242+
def _maintain_pool(self):
262243
"""Clean up any exited workers and start replacements for them.
263244
"""
264-
if Pool._join_exited_workers(pool):
265-
Pool._repopulate_pool_static(ctx, Process, processes, pool,
266-
inqueue, outqueue, initializer,
267-
initargs, maxtasksperchild,
268-
wrap_exception)
245+
if self._join_exited_workers():
246+
self._repopulate_pool()
269247

270248
def _setup_queues(self):
271249
self._inqueue = self._ctx.SimpleQueue()
@@ -418,20 +396,16 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
418396
return result
419397

420398
@staticmethod
421-
def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
422-
inqueue, outqueue, initializer, initargs,
423-
maxtasksperchild, wrap_exception):
399+
def _handle_workers(pool):
424400
thread = threading.current_thread()
425401

426402
# Keep maintaining workers until the cache gets drained, unless the pool
427403
# is terminated.
428-
while thread._state == RUN or (cache and thread._state != TERMINATE):
429-
Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
430-
outqueue, initializer, initargs,
431-
maxtasksperchild, wrap_exception)
404+
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
405+
pool._maintain_pool()
432406
time.sleep(0.1)
433407
# send sentinel to stop workers
434-
taskqueue.put(None)
408+
pool._taskqueue.put(None)
435409
util.debug('worker handler exiting')
436410

437411
@staticmethod
@@ -807,7 +781,7 @@ class ThreadPool(Pool):
807781
_wrap_exception = False
808782

809783
@staticmethod
810-
def Process(ctx, *args, **kwds):
784+
def Process(*args, **kwds):
811785
from .dummy import Process
812786
return Process(*args, **kwds)
813787

Lib/test/_test_multiprocessing.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2286,13 +2286,6 @@ def test_release_task_refs(self):
22862286
# they were released too.
22872287
self.assertEqual(CountedObject.n_instances, 0)
22882288

2289-
@support.reap_threads
2290-
def test_del_pool(self):
2291-
p = self.Pool(1)
2292-
wr = weakref.ref(p)
2293-
del p
2294-
gc.collect()
2295-
self.assertIsNone(wr())
22962289

22972290
def raising():
22982291
raise KeyError("key")
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
REVERT: Fix a reference issue inside multiprocessing.Pool that caused the
2+
pool to remain alive if it was deleted without being closed or terminated
3+
explicitly.

0 commit comments

Comments
 (0)