Skip to content

gh-115634: Force the process pool to adjust when a process worker exits #115642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ def run(self):
with self.shutdown_lock:
executor._adjust_process_count()
else:
executor._idle_worker_semaphore.release()
with executor._idle_worker_lock:
executor._idle_worker_number += 1
del executor

if self.is_shutting_down():
Expand Down Expand Up @@ -707,7 +708,8 @@ def __init__(self, max_workers=None, mp_context=None,
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._idle_worker_semaphore = threading.Semaphore(0)
self._idle_worker_lock = threading.Lock()
self._idle_worker_number = 0
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
Expand Down Expand Up @@ -754,19 +756,21 @@ def _start_executor_manager_thread(self):
self._executor_manager_thread_wakeup

def _adjust_process_count(self):
# if there's an idle process, we don't need to spawn a new one.
if self._idle_worker_semaphore.acquire(blocking=False):
return
with self._idle_worker_lock:
# if there's an idle process, we don't need to spawn a new one.
if self._idle_worker_number > 0:
return

process_count = len(self._processes)
if process_count < self._max_workers:
# Assertion disabled as this codepath is also used to replace a
# worker that unexpectedly dies, even when using the 'fork' start
# method. That means there is still a potential deadlock bug. If a
# 'fork' mp_context worker dies, we'll be forking a new one when
# we know a thread is running (self._executor_manager_thread).
#assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
self._spawn_process()
process_count = len(self._processes)
if process_count < self._max_workers:
# Assertion disabled as this codepath is also used to replace a
# worker that unexpectedly dies, even when using the 'fork' start
# method. That means there is still a potential deadlock bug. If a
# 'fork' mp_context worker dies, we'll be forking a new one when
# we know a thread is running (self._executor_manager_thread).
#assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
self._spawn_process()
self._idle_worker_number += 1

def _launch_processes(self):
# https://github.com/python/cpython/issues/90622
Expand Down Expand Up @@ -808,6 +812,8 @@ def submit(self, fn, /, *args, **kwargs):

if self._safe_to_dynamically_spawn_children:
self._adjust_process_count()
with self._idle_worker_lock:
self._idle_worker_number -= 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is decrementing _idle_worker_number just after it was incremented by _adjust_process_count, perhaps we can simplify this by adding an optional argument to _adjust_process_count, e.g. self._adjust_process_count(increment_idle_workers=False)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can. As I said, I think explicitly decrementing the worker number at where a task is submitted is easier to follow. _adjust_process_count may or may not increment the idle workers, depending on whether the max worker number is reached. If we combine the option to may or may not decrement the counter, that might be harder to follow. increment_idle_workers is not a good idea because it does not really mean anything. decrement_idle_workers is acceptable, but still a bit twisted.

The current logic is:

  1. _adjust_process_count() only adjusts process count, and change _idle_worker_number accordingly. It does nothing else.
  2. When a new task is submitted, _idle_worker_number is decremented
  3. When a task is finished, _idle_worker_number is incremented

I think this is much easier to follow than "figure out whether _adjust_process_count() should decrement the counter", because why would it? The function just _adjust_process_count.

self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__
Expand Down
18 changes: 18 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,24 @@ def test_max_tasks_per_child(self):

executor.shutdown()

def test_max_tasks_per_child_with_fast_submit(self):
# gh-115634:
# If many tasks are submitted quickly, the idle worker count
# would be wrong so the new worker won't be spawned.

context = self.get_context()
if context.get_start_method(allow_none=False) == "fork":
raise unittest.SkipTest("Incompatible with the fork start method.")

executor = self.executor_type(
1, mp_context=context, max_tasks_per_child=3)

# This will halt the process as there's no worker available and the
# pool won't spawn more
_ = [executor.submit(mul, i, i) for i in range(10)]

executor.shutdown()

def test_max_tasks_per_child_defaults_to_spawn_context(self):
# not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed the halt issue for :class:`ProcessPoolExecutor` when the task submission is fast to saturate **max_workers** and then all worker processes are killed due to **max_tasks_per_child**.