Skip to content

bpo-24882: Let ThreadPoolExecutor reuse idle threads before creating new thread #6375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ def _worker(executor_reference, work_queue, initializer, initargs):
work_item.run()
# Delete references to object. See issue16284
del work_item

# attempt to increment idle count
executor = executor_reference()
if executor is not None:
executor._idle_semaphore.release()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is _idle_semaphore presenting idle threads? If more than max_workers jobs is submited at the same time, won't _idle_semaphore value greater than the thread count?

del executor
continue

executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
Expand Down Expand Up @@ -133,6 +140,7 @@ def __init__(self, max_workers=None, thread_name_prefix='',

self._max_workers = max_workers
self._work_queue = queue.SimpleQueue()
self._idle_semaphore = threading.Semaphore(0)
self._threads = set()
self._broken = False
self._shutdown = False
Expand Down Expand Up @@ -178,12 +186,15 @@ def submit(*args, **kwargs):
submit.__doc__ = _base.Executor.submit.__doc__

def _adjust_thread_count(self):
# if idle threads are available, don't spin new threads
if self._idle_semaphore.acquire(timeout=0):
return

# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.

num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
Expand Down
32 changes: 29 additions & 3 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,15 @@ def _prime_executor(self):
pass

def test_threads_terminate(self):
self.executor.submit(mul, 21, 2)
self.executor.submit(mul, 6, 7)
self.executor.submit(mul, 3, 14)
def acquire_lock(lock):
lock.acquire()

sem = threading.Semaphore(0)
for i in range(3):
self.executor.submit(acquire_lock, sem)
self.assertEqual(len(self.executor._threads), 3)
for i in range(3):
sem.release()
self.executor.shutdown()
for t in self.executor._threads:
t.join()
Expand Down Expand Up @@ -753,6 +758,27 @@ def test_default_workers(self):
self.assertEqual(executor._max_workers,
(os.cpu_count() or 1) * 5)

def test_saturation(self):
executor = self.executor_type(4)
def acquire_lock(lock):
lock.acquire()

sem = threading.Semaphore(0)
for i in range(15 * executor._max_workers):
executor.submit(acquire_lock, sem)
self.assertEqual(len(executor._threads), executor._max_workers)
for i in range(15 * executor._max_workers):
sem.release()
executor.shutdown(wait=True)

def test_idle_thread_reuse(self):
executor = self.executor_type()
executor.submit(mul, 21, 2).result()
executor.submit(mul, 6, 7).result()
executor.submit(mul, 3, 14).result()
self.assertEqual(len(executor._threads), 1)
executor.shutdown(wait=True)


class ProcessPoolExecutorTest(ExecutorTest):

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change ThreadPoolExecutor to use existing idle threads before spinning up new ones.