Skip to content

Commit 28bb669

Browse files
committed
Use executor weakref to access semaphore
1 parent 04e5927 commit 28bb669

File tree

1 file changed

+8
-9
lines changed

1 file changed

+8
-9
lines changed

Lib/concurrent/futures/process.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,7 @@ def _sendback_result(result_queue, work_id, result=None, exception=None):
209209
result_queue.put(_ResultItem(work_id, exception=exc))
210210

211211

212-
def _process_worker(call_queue, result_queue, initializer, initargs,
213-
idle_worker_semaphore):
212+
def _process_worker(call_queue, result_queue, initializer, initargs):
214213
"""Evaluates calls from call_queue and places the results in result_queue.
215214
216215
This worker is run in a separate process.
@@ -222,8 +221,6 @@ def _process_worker(call_queue, result_queue, initializer, initargs,
222221
to by the worker.
223222
initializer: A callable initializer, or None
224223
initargs: A tuple of args for the initializer
225-
idle_worker_semaphore: A multiprocessing.Semaphore that is used to
226-
prevent new workers from being spawned when there are idle workers.
227224
"""
228225
if initializer is not None:
229226
try:
@@ -252,8 +249,6 @@ def _process_worker(call_queue, result_queue, initializer, initargs,
252249
# open files or shared memory that is not needed anymore
253250
del call_item
254251

255-
# increment idle process count after worker finishes job
256-
idle_worker_semaphore.release()
257252

258253
class _ExecutorManagerThread(threading.Thread):
259254
"""Manages the communication between this process and the worker processes.
@@ -323,6 +318,12 @@ def run(self):
323318
# while waiting on new results.
324319
del result_item
325320

321+
# attempt to increment idle process count
322+
executor = self.executor_reference()
323+
if executor is not None:
324+
executor._idle_worker_semaphore.release()
325+
del executor
326+
326327
if self.is_shutting_down():
327328
self.flag_executor_shutting_down()
328329

@@ -656,8 +657,7 @@ def _adjust_process_count(self):
656657
args=(self._call_queue,
657658
self._result_queue,
658659
self._initializer,
659-
self._initargs,
660-
self._idle_worker_semaphore))
660+
self._initargs))
661661
p.start()
662662
self._processes[p.pid] = p
663663

@@ -730,7 +730,6 @@ def shutdown(self, wait=True, *, cancel_futures=False):
730730
self._call_queue = None
731731
self._result_queue = None
732732
self._processes = None
733-
self._idle_worker_semaphore = None
734733

735734
if self._executor_manager_thread_wakeup:
736735
self._executor_manager_thread_wakeup = None

0 commit comments

Comments
 (0)