Skip to content

Commit 358fc87

Browse files
authored
Revert "[2.7] bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-9686)" (GH-10970)
This reverts commit 4a7dd30.
1 parent 40ef5b7 commit 358fc87

File tree

3 files changed

+19
-45
lines changed

3 files changed

+19
-45
lines changed

Lib/multiprocessing/pool.py

Lines changed: 19 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
162162

163163
self._worker_handler = threading.Thread(
164164
target=Pool._handle_workers,
165-
args=(self._cache, self._processes, self._pool, self.Process,
166-
self._inqueue, self._outqueue, self._initializer,
167-
self._initargs, self._maxtasksperchild, self._taskqueue)
165+
args=(self, )
168166
)
169167
self._worker_handler.daemon = True
170168
self._worker_handler._state = RUN
@@ -196,56 +194,42 @@ def __init__(self, processes=None, initializer=None, initargs=(),
196194
exitpriority=15
197195
)
198196

199-
@staticmethod
200-
def _join_exited_workers(pool):
197+
def _join_exited_workers(self):
201198
"""Cleanup after any worker processes which have exited due to reaching
202199
their specified lifetime. Returns True if any workers were cleaned up.
203200
"""
204201
cleaned = False
205-
for i in reversed(range(len(pool))):
206-
worker = pool[i]
202+
for i in reversed(range(len(self._pool))):
203+
worker = self._pool[i]
207204
if worker.exitcode is not None:
208205
# worker exited
209206
debug('cleaning up worker %d' % i)
210207
worker.join()
211208
cleaned = True
212-
del pool[i]
209+
del self._pool[i]
213210
return cleaned
214211

215212
def _repopulate_pool(self):
216-
return self._repopulate_pool_static(self._processes, self._pool,
217-
self.Process, self._inqueue,
218-
self._outqueue, self._initializer,
219-
self._initargs,
220-
self._maxtasksperchild)
221-
222-
@staticmethod
223-
def _repopulate_pool_static(processes, pool, Process, inqueue, outqueue,
224-
initializer, initargs, maxtasksperchild):
225213
"""Bring the number of pool processes up to the specified number,
226214
for use after reaping workers which have exited.
227215
"""
228-
for i in range(processes - len(pool)):
229-
w = Process(target=worker,
230-
args=(inqueue, outqueue,
231-
initializer,
232-
initargs, maxtasksperchild)
233-
)
234-
pool.append(w)
216+
for i in range(self._processes - len(self._pool)):
217+
w = self.Process(target=worker,
218+
args=(self._inqueue, self._outqueue,
219+
self._initializer,
220+
self._initargs, self._maxtasksperchild)
221+
)
222+
self._pool.append(w)
235223
w.name = w.name.replace('Process', 'PoolWorker')
236224
w.daemon = True
237225
w.start()
238226
debug('added worker')
239227

240-
@staticmethod
241-
def _maintain_pool(processes, pool, Process, inqueue, outqueue,
242-
initializer, initargs, maxtasksperchild):
228+
def _maintain_pool(self):
243229
"""Clean up any exited workers and start replacements for them.
244230
"""
245-
if Pool._join_exited_workers(pool):
246-
Pool._repopulate_pool_static(processes, pool, Process, inqueue,
247-
outqueue, initializer, initargs,
248-
maxtasksperchild)
231+
if self._join_exited_workers():
232+
self._repopulate_pool()
249233

250234
def _setup_queues(self):
251235
from .queues import SimpleQueue
@@ -335,18 +319,16 @@ def map_async(self, func, iterable, chunksize=None, callback=None):
335319
return result
336320

337321
@staticmethod
338-
def _handle_workers(cache, processes, pool, Process, inqueue, outqueue,
339-
initializer, initargs, maxtasksperchild, taskqueue):
322+
def _handle_workers(pool):
340323
thread = threading.current_thread()
341324

342325
# Keep maintaining workers until the cache gets drained, unless the pool
343326
# is terminated.
344-
while thread._state == RUN or (cache and thread._state != TERMINATE):
345-
Pool._maintain_pool(processes, pool, Process, inqueue, outqueue,
346-
initializer, initargs, maxtasksperchild)
327+
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
328+
pool._maintain_pool()
347329
time.sleep(0.1)
348330
# send sentinel to stop workers
349-
taskqueue.put(None)
331+
pool._taskqueue.put(None)
350332
debug('worker handler exiting')
351333

352334
@staticmethod

Lib/test/test_multiprocessing.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1359,13 +1359,6 @@ def test_release_task_refs(self):
13591359
# they were released too.
13601360
self.assertEqual(CountedObject.n_instances, 0)
13611361

1362-
def test_del_pool(self):
1363-
p = self.Pool(1)
1364-
wr = weakref.ref(p)
1365-
del p
1366-
gc.collect()
1367-
self.assertIsNone(wr())
1368-
13691362

13701363
def unpickleable_result():
13711364
return lambda: 42

Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)