-
-
Notifications
You must be signed in to change notification settings - Fork 32.5k
bpo-24882: Let ThreadPoolExecutor reuse idle threads before creating new thread #6375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed on the issue tracker, I don't think it's a good idea to add this.
Added comments on the issue tracker replying - I was hoping the original author would comment first, as they had a TODO requesting that this issue be fixed in the future. Also, it's been more than a week since I submitted my CLA, should I be concerned? |
By the way:
I think CLA processing is a bit slow lately. If this persists I'll try pinging the PSF. |
Lib/test/test_concurrent_futures.py
Outdated
@@ -318,7 +318,7 @@ def test_threads_terminate(self): | |||
self.executor.submit(mul, 21, 2) | |||
self.executor.submit(mul, 6, 7) | |||
self.executor.submit(mul, 3, 14) | |||
self.assertEqual(len(self.executor._threads), 3) | |||
self.assertTrue(len(self.executor._threads) < 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've submitted three things, so why wouldn't it be possible for it to have launched three threads? Granted, the work per task is cheap, so it's possible (maybe even likely, given GIL interference) that the first task is done in time for its thread to be reused by the third task, but that's not guaranteed, is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this test might fail randomly. It would be better to force the collection of the 3 tasks with calls to result
before launching the next one and verify that only one thread has been used.
@tomMoral Would you like to give this a review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation seems to be doing the job. I think it would be a bit simpler by using a Semaphore
instead of a lock protected counter.
I am unsure about the race conditions that might occur. Here, I do not see any obvious ones but to be on the safe side, it would be nice to have a test checking that you can indeed saturate the ThreadPoolExecutor
with many small tasks. for instance:
def test_saturation(self):
list(self.executor.map(mul, range(100 * self.executor._max_workers), y=0))
self.assertEqual(len(self.executor._threads), self.executor._max_workers)
Lib/concurrent/futures/thread.py
Outdated
@@ -129,6 +136,8 @@ def __init__(self, max_workers=None, thread_name_prefix='', | |||
|
|||
self._max_workers = max_workers | |||
self._work_queue = queue.SimpleQueue() | |||
self._idle_lock = threading.Lock() | |||
self._idle_count = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you use a threading.Semaphore
for this?
It makes the implementation easier to read as you don't need to handle the lock + increments but only rely on acquire
and release
.
Lib/test/test_concurrent_futures.py
Outdated
@@ -318,7 +318,7 @@ def test_threads_terminate(self): | |||
self.executor.submit(mul, 21, 2) | |||
self.executor.submit(mul, 6, 7) | |||
self.executor.submit(mul, 3, 14) | |||
self.assertEqual(len(self.executor._threads), 3) | |||
self.assertTrue(len(self.executor._threads) < 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this test might fail randomly. It would be better to force the collection of the 3 tasks with calls to result
before launching the next one and verify that only one thread has been used.
Lib/concurrent/futures/thread.py
Outdated
with self._idle_lock: | ||
if self._idle_count > 0: | ||
self._idle_count -= 1 | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If _idle_count
is a semaphore, you simply need to use:
if self._idle_count.acquire(timeout=0):
return
Lib/concurrent/futures/thread.py
Outdated
# attempt to increment idle count | ||
executor = executor_reference() | ||
if executor is not None: | ||
executor._increase_idle_count() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If _idle_count
is a semaphore, you just need to increase the number with release
:
executor._idle_count.release()
Since GH-6530 I've changed my opinion so I'm closing that PR. |
Thank you for the review. I've made the suggested changes. |
@tomMoral, please re-review. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the update. Here are a few more comments, mostly around tests.
A Python core developer has requested some changes be made to your pull request before we can consider merging it. If you could please address their requests along with any other requests in other reviews from core developers that would be appreciated. Once you have made the requested changes, please leave a comment on this pull request containing the phrase |
I have made the requested changes; please review again. One note - I wasn't 100% sure what best practice would be for the tests in the ThreadPoolExecutorTest class. Since I wanted a clean executor for both the saturation test, and the idle reuse test, I ended up creating a new executor for each test (instead of using the one within the class instance). Hopefully that is acceptable. |
Adjust the shutdown test so that, after submitting three jobs to the executor, the test checks for less than three threads, instead of looking for exactly three threads. If idle threads are being recycled properly, then we should have less than three threads.
As suggested by reviewer tomMoral, swapped lock-protected counter with a semaphore to track the number of unused threads. Adjusted test_threads_terminate to wait for completiton of the previous future before submitting a new one (and checking the number of threads used). Also added a new test to confirm the thread pool can be saturated.
Thanks @iUnknwn. I've rebased and will merge if CI is green. |
@pitrou: Please replace |
# attempt to increment idle count | ||
executor = executor_reference() | ||
if executor is not None: | ||
executor._idle_semaphore.release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is _idle_semaphore
presenting idle threads? If more than max_workers
jobs is submited at the same time, won't _idle_semaphore
value greater than the thread count?
The change fixes an issue where ThreadPoolExecutor doesn't check if existing threads are idle before spinning up new ones. To do this, we use a simple counter within the Executor to track how many threads are idle, and atomically increment and decrement the count as needed.
One question - the previous code to spin up a new thread in
_adjust_thread_count
does not appear thread safe - if two threads are both submitting items to the executor at the same time, the call to check the number of threads could be invalid. Is this something that should also be fixed, or is this not an issue because of the global interpreter lock?https://bugs.python.org/issue24882