Skip to content

Commit 1ac6e37

Browse files
authored
bpo-39207: Spawn workers on demand in ProcessPoolExecutor (GH-19453)
Roughly based on 904e34d, but with a few substantial differences. /cc @pitrou @brianquinlan
1 parent c12375a commit 1ac6e37

File tree

4 files changed

+63
-6
lines changed

4 files changed

+63
-6
lines changed

Doc/whatsnew/3.9.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,11 @@ and :class:`~concurrent.futures.ProcessPoolExecutor`. This improves
206206
compatibility with subinterpreters and predictability in their shutdown
207207
processes. (Contributed by Kyle Stanley in :issue:`39812`.)
208208

209+
Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on
210+
demand, only when there are no available idle workers to reuse. This optimizes
211+
startup overhead and reduces the amount of lost CPU time to idle workers.
212+
(Contributed by Kyle Stanley in :issue:`39207`.)
213+
209214
curses
210215
------
211216

Lib/concurrent/futures/process.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,12 @@ def run(self):
318318
# while waiting on new results.
319319
del result_item
320320

321+
# attempt to increment idle process count
322+
executor = self.executor_reference()
323+
if executor is not None:
324+
executor._idle_worker_semaphore.release()
325+
del executor
326+
321327
if self.is_shutting_down():
322328
self.flag_executor_shutting_down()
323329

@@ -601,6 +607,7 @@ def __init__(self, max_workers=None, mp_context=None,
601607
# Shutdown is a two-step process.
602608
self._shutdown_thread = False
603609
self._shutdown_lock = threading.Lock()
610+
self._idle_worker_semaphore = threading.Semaphore(0)
604611
self._broken = False
605612
self._queue_count = 0
606613
self._pending_work_items = {}
@@ -633,14 +640,18 @@ def __init__(self, max_workers=None, mp_context=None,
633640
def _start_executor_manager_thread(self):
634641
if self._executor_manager_thread is None:
635642
# Start the processes so that their sentinels are known.
636-
self._adjust_process_count()
637643
self._executor_manager_thread = _ExecutorManagerThread(self)
638644
self._executor_manager_thread.start()
639645
_threads_wakeups[self._executor_manager_thread] = \
640646
self._executor_manager_thread_wakeup
641647

642648
def _adjust_process_count(self):
643-
for _ in range(len(self._processes), self._max_workers):
649+
# if there's an idle process, we don't need to spawn a new one.
650+
if self._idle_worker_semaphore.acquire(blocking=False):
651+
return
652+
653+
process_count = len(self._processes)
654+
if process_count < self._max_workers:
644655
p = self._mp_context.Process(
645656
target=_process_worker,
646657
args=(self._call_queue,
@@ -669,6 +680,7 @@ def submit(self, fn, /, *args, **kwargs):
669680
# Wake up queue management thread
670681
self._executor_manager_thread_wakeup.wakeup()
671682

683+
self._adjust_process_count()
672684
self._start_executor_manager_thread()
673685
return f
674686
submit.__doc__ = _base.Executor.submit.__doc__

Lib/test/test_concurrent_futures.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -486,10 +486,16 @@ def _prime_executor(self):
486486
pass
487487

488488
def test_processes_terminate(self):
489-
self.executor.submit(mul, 21, 2)
490-
self.executor.submit(mul, 6, 7)
491-
self.executor.submit(mul, 3, 14)
492-
self.assertEqual(len(self.executor._processes), 5)
489+
def acquire_lock(lock):
490+
lock.acquire()
491+
492+
mp_context = get_context()
493+
sem = mp_context.Semaphore(0)
494+
for _ in range(3):
495+
self.executor.submit(acquire_lock, sem)
496+
self.assertEqual(len(self.executor._processes), 3)
497+
for _ in range(3):
498+
sem.release()
493499
processes = self.executor._processes
494500
self.executor.shutdown()
495501

@@ -964,6 +970,36 @@ def test_ressources_gced_in_workers(self):
964970
mgr.shutdown()
965971
mgr.join()
966972

973+
def test_saturation(self):
974+
executor = self.executor_type(4)
975+
mp_context = get_context()
976+
sem = mp_context.Semaphore(0)
977+
job_count = 15 * executor._max_workers
978+
try:
979+
for _ in range(job_count):
980+
executor.submit(sem.acquire)
981+
self.assertEqual(len(executor._processes), executor._max_workers)
982+
for _ in range(job_count):
983+
sem.release()
984+
finally:
985+
executor.shutdown()
986+
987+
def test_idle_process_reuse_one(self):
988+
executor = self.executor_type(4)
989+
executor.submit(mul, 21, 2).result()
990+
executor.submit(mul, 6, 7).result()
991+
executor.submit(mul, 3, 14).result()
992+
self.assertEqual(len(executor._processes), 1)
993+
executor.shutdown()
994+
995+
def test_idle_process_reuse_multiple(self):
996+
executor = self.executor_type(4)
997+
executor.submit(mul, 12, 7).result()
998+
executor.submit(mul, 33, 25)
999+
executor.submit(mul, 25, 26).result()
1000+
executor.submit(mul, 18, 29)
1001+
self.assertLessEqual(len(executor._processes), 2)
1002+
executor.shutdown()
9671003

9681004
create_executor_tests(ProcessPoolExecutorTest,
9691005
executor_mixins=(ProcessPoolForkMixin,
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on
2+
demand, only when there are no available idle workers to reuse. This optimizes
3+
startup overhead and reduces the amount of lost CPU time to idle workers.
4+
Patch by Kyle Stanley.

0 commit comments

Comments
 (0)