Skip to content

gh-115634: Force the process pool to adjust when a process worker exits #115642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

gaogaotiantian
Copy link
Member

@gaogaotiantian gaogaotiantian commented Feb 19, 2024

concurrent.futures.ProcessPoolExecutor fails to count the available worker processes which leads to a halt in the following program:

from concurrent.futures import ProcessPoolExecutor
if __name__ == '__main__':
    with ProcessPoolExecutor(1, max_tasks_per_child=2) as exe:
        futs = [exe.submit(print, i) for i in range(10)]

This is a result of the combination of #19453 and #27373. #19453 introduces _idle_worker_semaphore to check if there are idle worker processes, which saved some time to spawning all the process unconditionally at start time. The calculation is smart - using a semaphore as the indicator, and acquire the semaphore for every submit. When it fails to acquire the semaphore, spawn a new worker process. If a result is returned from the process, release the semaphore because that means a worker process is available again.

The math works until the number of the process worker reaches the maximum number - then the semaphore is meaningless, because submit won't change it, or spawn a new process, but the returned results keep releasing it. That's fine, because upon that time, the purpose of the semaphore has served, it's not needed anymore.

However, #27373 came up, introducing another mechanism - max_tasks_per_child, which kills a worker process when certain tasks have been processed by it. Unfortunately, it re-used _adjust_process_count for its worker re-spawn, which is the mechanism above - checking the semaphore then decide whether a new worker process should be spawned.

As I mentioned, the semaphore is meaningless after the number of the worker process reaches maximum, so this mechanism simply won't work. After the worker process is killed, no process will be spawned because the pool manager believes there are more idle workers.

An example sequence with max_workers = 1, max_tasks_per_child=2

submit(0) - acquire semaphore(failed) - spawn worker process
submit(1) - acquire semaphore(failed) - reached max_worker, do nothing
process(0) - 1/2 for the worker process
get_result(0) - release semaphore(value=1)
process(1) - 2/2 for the worker process - exit and let the manager know it's dead
get_result(1) - realize the worker process is dead, do `_adjust_process_count`, which **acquires semaphore and succeeds**! No need to spawn any workers.
DEAD

I used the simplest way possible to fix this - force _adjust_process_count to check process numbers when a worker exits, which fixed the issue. We can do some smart math on the semaphore but I do believe we need to access the value of it, which is not in the public documentation so I'm a bit hesitated to do that.

@gaogaotiantian
Copy link
Member Author

@pitrou could you take a look at the fix as you are the reviewer for both PRs? Thanks!

Comment on lines 362 to 365
with self.shutdown_lock:
executor._adjust_process_count()
executor._adjust_process_count(force=True)
else:
executor._idle_worker_semaphore.release()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should think more generally about what _idle_worker_semaphore is supposed to mean.

If the process has not exited, we release _idle_worker_semaphore, implying that the process was marked busy and now switches to idle.
If the process has exited, we call _adjust_process_count which acquires _idle_worker_semaphore as if the process had switched from idle to busy... but it has not (it was already considered busy).

So it seems to me that, if the process has exited, we should simply not acquire the semaphore as the process was not accounted as idle.

This would change the code to something like:

    def _require_process(self):
        """
        Require a process for task execution, potentially spawning a new one.
        """
        # if there's an idle process, we don't need to spawn a new one.
        if not self._idle_worker_semaphore.acquire(blocking=False):
            self._adjust_process_count()

    def _adjust_process_count(self):
        ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the call _require_process from submit, and call _adjust_process_count when a worker process exited.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_idle_worker_semaphore is meaningless as long as we reach the maximum number of workers because the math would be wrong since then. Each submit tries to aqcuire the semaphore, but with blocking=False, which means submit will not necessarily decrement the counter. However, all the finished tasks will release the semaphore so all of them will increase the counter and the counter will just be something huge - the semaphore is useless at that point any any check against that is meaningless. acquire will always succeed and no new process will be spawn.

The math behind _idle_worker_semaphore is simply fragile and it only works when we have not reached max worker number.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok... so the fix should address the problem of _idle_worker_semaphore being meaningless, IMHO.

_idle_worker_semaphore is not really used for synchronization since we never block on it. We merely use it as some kind of atomic integer. So perhaps we should replace that semaphore with a lock-protected integer (for lack of an atomic integer primitive in the Python stdlib). That would allow its value to become negative, and the accounting would become correct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can do that. threading.Semaphore has a private _value attribute to access the value - which is what I originally mentioned that could make this work, but I'm hesitated to use it even internally. We can convert the whole _idle_worker_semaphore and make it always valid. Do you want me to do that instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So _adjust_process_count will do nothing if a forked worker dead.

That sounds wrong and that also contradicts the longish comment in _adjust_process_count ("Assertion disabled as this codepath ...").

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concretely, it seems that:

  1. if a worker exits, _adjust_process_count is called and may launch another worker (in which case it will increment the count). That's regardless of the start method.
  2. if a task is submitted, _adjust_process_count is called and the worker count is updated only if _safe_to_dynamically_spawn_children is true.

It seems that creates a problem: if _safe_to_dynamically_spawn_children is false, the accounting of idle workers will break when workers exit and new tasks are submitted.

The solution should simply be to remove the _safe_to_dynamically_spawn_children condition in submit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution should simply be to remove the _safe_to_dynamically_spawn_children condition in submit.

That would be wrong as it will spawn a process in fork case before _start_executor_manager_thread initialize the process pool. The original reason why this is added: #90622

Do we want to go deep on this matter? Because we need to decide what's the best option:

  1. Risk dead-lock to always saturate the process pool
  2. Risk the process pool to be unsaturated

We can't avoid 2. entirely because we relies on the return message from the worker process and the process could die before it can send the message.

Also removing the condition for good has performance impact on every submit in fork case, but may not matter that much.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, besides the fact a new process will be spawned, the math is still incorrect because with fork, all the worker processes will be spawned without the counter calculation - we need to change that as well if we want to make it work. How large of an impact do we want this PR to be? Do we want to fix the whole function for good? If so, we probably need some extra design discussions, like should we check if the process pool is idle if a process dies in fork case?

BTW, again, my first patch actually happens to solve this issue :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds wrong and that also contradicts the longish comment in _adjust_process_count ("Assertion disabled as this codepath ...").

And yes, that comment has been wrong since this optimization was added. It was never correct due to the incorrect math.

@@ -808,6 +812,8 @@ def submit(self, fn, /, *args, **kwargs):

if self._safe_to_dynamically_spawn_children:
self._adjust_process_count()
with self._idle_worker_lock:
self._idle_worker_number -= 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is decrementing _idle_worker_number just after it was incremented by _adjust_process_count, perhaps we can simplify this by adding an optional argument to _adjust_process_count, e.g. self._adjust_process_count(increment_idle_workers=False)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can. As I said, I think explicitly decrementing the worker number at where a task is submitted is easier to follow. _adjust_process_count may or may not increment the idle workers, depending on whether the max worker number is reached. If we combine the option to may or may not decrement the counter, that might be harder to follow. increment_idle_workers is not a good idea because it does not really mean anything. decrement_idle_workers is acceptable, but still a bit twisted.

The current logic is:

  1. _adjust_process_count() only adjusts process count, and change _idle_worker_number accordingly. It does nothing else.
  2. When a new task is submitted, _idle_worker_number is decremented
  3. When a task is finished, _idle_worker_number is incremented

I think this is much easier to follow than "figure out whether _adjust_process_count() should decrement the counter", because why would it? The function just _adjust_process_count.

@gaogaotiantian
Copy link
Member Author

Hi @pitrou , do you have any more concerns about this PR? Do we want to fix the fork case for good? Or we can make a simple absolute improvement which fixes some of the issues and does not introduce any new issue. Yes, some broken parts will still be broken, but it's better.

@gaogaotiantian
Copy link
Member Author

Hi @pitrou , are you still interested in reviewing this PR? If you don't feel comfortable discussing this change, I can try to ask other core devs for it. I'm trying to push this because there's not only one, but two issues that I know reporting this (I did not search the history for other possible related issues). I believe this is worth fixing, it's a pretty bad bug that hangs the process.

@mj0nez
Copy link

mj0nez commented Jul 10, 2024

Hey, any update on this topic? It would be great to use this feature. :)

@gaogaotiantian
Copy link
Member Author

I'm not sure if @pitrou is still interested in this PR and I'd like to move it forward as well. If he does not have time, I'm happy to find someone else to review this.

@gpshead gpshead self-requested a review October 14, 2024 20:05
@gpshead gpshead self-assigned this Oct 14, 2024
@gpshead
Copy link
Member

gpshead commented Oct 14, 2024

I'm putting this on my plate to look at but I can't promise I'll get to it soon.

In general: The on demand worker spawning logic, while an understandably desirable feature, has been a source of a number of bugs. The workaround for people running into these remains: Don't use those features yet.

@tabrezm
Copy link

tabrezm commented Oct 28, 2024

I believe #119592 and #111498 would also be resolved by this PR. Thanks for the proposed fix!

@gaogaotiantian
Copy link
Member Author

I think we should try to make some progress on this PR, it solves a real issue which is reported by multiple users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants