Skip to content

Commit 2a43afd

Browse files
authored
[3.9] [3.10] gh-91607: Fix several test_concurrent_futures tests to actually test what they claim (GH-91600) (GH-91612) (#91617)
* Fix test_concurrent_futures to actually test what it says. Many ProcessPoolExecutor based tests were ignoring the mp_context and using the default instead. This meant we lacked proper test coverage of all of them. Also removes the old _prime_executor() worker delay seeding code as it appears to have no point and causes 20-30 seconds extra latency on this already long test. It also interfered with some of the refactoring to fix the above to not needlessly create their own executor when setUp has already created an appropriate one. * Don't import the name from multiprocessing directly to avoid confusion. (cherry picked from commit 7fa3a5a) (cherry picked from commit 9a45893)
1 parent c171d75 commit 2a43afd

File tree

2 files changed

+43
-54
lines changed

2 files changed

+43
-54
lines changed

Lib/test/test_concurrent_futures.py

+42-54
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
2727
BrokenExecutor)
2828
from concurrent.futures.process import BrokenProcessPool
29-
from multiprocessing import get_context
3029

3130
import multiprocessing.process
3231
import multiprocessing.util
32+
import multiprocessing as mp
3333

3434

3535
if support.check_sanitizer(address=True, memory=True):
@@ -131,7 +131,6 @@ def setUp(self):
131131
self.executor = self.executor_type(
132132
max_workers=self.worker_count,
133133
**self.executor_kwargs)
134-
self._prime_executor()
135134

136135
def tearDown(self):
137136
self.executor.shutdown(wait=True)
@@ -145,15 +144,7 @@ def tearDown(self):
145144
super().tearDown()
146145

147146
def get_context(self):
148-
return get_context(self.ctx)
149-
150-
def _prime_executor(self):
151-
# Make sure that the executor is ready to do work before running the
152-
# tests. This should reduce the probability of timeouts in the tests.
153-
futures = [self.executor.submit(time.sleep, 0.1)
154-
for _ in range(self.worker_count)]
155-
for f in futures:
156-
f.result()
147+
return mp.get_context(self.ctx)
157148

158149

159150
class ThreadPoolMixin(ExecutorMixin):
@@ -261,9 +252,6 @@ def test_initializer(self):
261252
with self.assertRaises(BrokenExecutor):
262253
self.executor.submit(get_init_status)
263254

264-
def _prime_executor(self):
265-
pass
266-
267255
@contextlib.contextmanager
268256
def _assert_logged(self, msg):
269257
if self.log_queue is not None:
@@ -350,14 +338,14 @@ def test_hang_issue12364(self):
350338
f.result()
351339

352340
def test_cancel_futures(self):
353-
executor = self.executor_type(max_workers=3)
354-
fs = [executor.submit(time.sleep, .1) for _ in range(50)]
355-
executor.shutdown(cancel_futures=True)
341+
assert self.worker_count <= 5, "test needs few workers"
342+
fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
343+
self.executor.shutdown(cancel_futures=True)
356344
# We can't guarantee the exact number of cancellations, but we can
357-
# guarantee that *some* were cancelled. With setting max_workers to 3,
358-
# most of the submitted futures should have been cancelled.
345+
# guarantee that *some* were cancelled. With few workers, many of
346+
# the submitted futures should have been cancelled.
359347
cancelled = [fut for fut in fs if fut.cancelled()]
360-
self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
348+
self.assertGreater(len(cancelled), 20)
361349

362350
# Ensure the other futures were able to finish.
363351
# Use "not fut.cancelled()" instead of "fut.done()" to include futures
@@ -370,33 +358,32 @@ def test_cancel_futures(self):
370358
# Similar to the number of cancelled futures, we can't guarantee the
371359
# exact number that completed. But, we can guarantee that at least
372360
# one finished.
373-
self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
361+
self.assertGreater(len(others), 0)
374362

375-
def test_hang_issue39205(self):
363+
def test_hang_gh83386(self):
376364
"""shutdown(wait=False) doesn't hang at exit with running futures.
377365
378-
See https://bugs.python.org/issue39205.
366+
See https://github.com/python/cpython/issues/83386.
379367
"""
380368
if self.executor_type == futures.ProcessPoolExecutor:
381369
raise unittest.SkipTest(
382-
"Hangs due to https://bugs.python.org/issue39205")
370+
"Hangs, see https://github.com/python/cpython/issues/83386")
383371

384372
rc, out, err = assert_python_ok('-c', """if True:
385373
from concurrent.futures import {executor_type}
386374
from test.test_concurrent_futures import sleep_and_print
387375
if __name__ == "__main__":
376+
if {context!r}: multiprocessing.set_start_method({context!r})
388377
t = {executor_type}(max_workers=3)
389378
t.submit(sleep_and_print, 1.0, "apple")
390379
t.shutdown(wait=False)
391-
""".format(executor_type=self.executor_type.__name__))
380+
""".format(executor_type=self.executor_type.__name__,
381+
context=getattr(self, 'ctx', None)))
392382
self.assertFalse(err)
393383
self.assertEqual(out.strip(), b"apple")
394384

395385

396386
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
397-
def _prime_executor(self):
398-
pass
399-
400387
def test_threads_terminate(self):
401388
def acquire_lock(lock):
402389
lock.acquire()
@@ -491,14 +478,11 @@ def test_cancel_futures_wait_false(self):
491478

492479

493480
class ProcessPoolShutdownTest(ExecutorShutdownTest):
494-
def _prime_executor(self):
495-
pass
496-
497481
def test_processes_terminate(self):
498482
def acquire_lock(lock):
499483
lock.acquire()
500484

501-
mp_context = get_context()
485+
mp_context = self.get_context()
502486
sem = mp_context.Semaphore(0)
503487
for _ in range(3):
504488
self.executor.submit(acquire_lock, sem)
@@ -512,7 +496,8 @@ def acquire_lock(lock):
512496
p.join()
513497

514498
def test_context_manager_shutdown(self):
515-
with futures.ProcessPoolExecutor(max_workers=5) as e:
499+
with futures.ProcessPoolExecutor(
500+
max_workers=5, mp_context=self.get_context()) as e:
516501
processes = e._processes
517502
self.assertEqual(list(e.map(abs, range(-5, 5))),
518503
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
@@ -521,7 +506,8 @@ def test_context_manager_shutdown(self):
521506
p.join()
522507

523508
def test_del_shutdown(self):
524-
executor = futures.ProcessPoolExecutor(max_workers=5)
509+
executor = futures.ProcessPoolExecutor(
510+
max_workers=5, mp_context=self.get_context())
525511
res = executor.map(abs, range(-5, 5))
526512
executor_manager_thread = executor._executor_manager_thread
527513
processes = executor._processes
@@ -544,7 +530,8 @@ def test_del_shutdown(self):
544530
def test_shutdown_no_wait(self):
545531
# Ensure that the executor cleans up the processes when calling
546532
# shutdown with wait=False
547-
executor = futures.ProcessPoolExecutor(max_workers=5)
533+
executor = futures.ProcessPoolExecutor(
534+
max_workers=5, mp_context=self.get_context())
548535
res = executor.map(abs, range(-5, 5))
549536
processes = executor._processes
550537
call_queue = executor._call_queue
@@ -921,7 +908,7 @@ def submit(pool):
921908
pool.submit(submit, pool)
922909

923910
for _ in range(50):
924-
with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers:
911+
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
925912
workers.submit(tuple)
926913

927914

@@ -991,7 +978,7 @@ def test_traceback(self):
991978
def test_ressources_gced_in_workers(self):
992979
# Ensure that argument for a job are correctly gc-ed after the job
993980
# is finished
994-
mgr = get_context(self.ctx).Manager()
981+
mgr = self.get_context().Manager()
995982
obj = EventfulGCObj(mgr)
996983
future = self.executor.submit(id, obj)
997984
future.result()
@@ -1007,36 +994,37 @@ def test_ressources_gced_in_workers(self):
1007994
mgr.join()
1008995

1009996
def test_saturation(self):
1010-
executor = self.executor_type(4)
1011-
mp_context = get_context()
997+
executor = self.executor
998+
mp_context = self.get_context()
1012999
sem = mp_context.Semaphore(0)
10131000
job_count = 15 * executor._max_workers
1014-
try:
1015-
for _ in range(job_count):
1016-
executor.submit(sem.acquire)
1017-
self.assertEqual(len(executor._processes), executor._max_workers)
1018-
for _ in range(job_count):
1019-
sem.release()
1020-
finally:
1021-
executor.shutdown()
1001+
for _ in range(job_count):
1002+
executor.submit(sem.acquire)
1003+
self.assertEqual(len(executor._processes), executor._max_workers)
1004+
for _ in range(job_count):
1005+
sem.release()
10221006

10231007
def test_idle_process_reuse_one(self):
1024-
executor = self.executor_type(4)
1008+
executor = self.executor
1009+
assert executor._max_workers >= 4
10251010
executor.submit(mul, 21, 2).result()
10261011
executor.submit(mul, 6, 7).result()
10271012
executor.submit(mul, 3, 14).result()
10281013
self.assertEqual(len(executor._processes), 1)
1029-
executor.shutdown()
10301014

10311015
def test_idle_process_reuse_multiple(self):
1032-
executor = self.executor_type(4)
1016+
executor = self.executor
1017+
assert executor._max_workers <= 5
10331018
executor.submit(mul, 12, 7).result()
10341019
executor.submit(mul, 33, 25)
10351020
executor.submit(mul, 25, 26).result()
10361021
executor.submit(mul, 18, 29)
1037-
self.assertLessEqual(len(executor._processes), 2)
1022+
executor.submit(mul, 1, 2).result()
1023+
executor.submit(mul, 0, 9)
1024+
self.assertLessEqual(len(executor._processes), 3)
10381025
executor.shutdown()
10391026

1027+
10401028
create_executor_tests(ProcessPoolExecutorTest,
10411029
executor_mixins=(ProcessPoolForkMixin,
10421030
ProcessPoolForkserverMixin,
@@ -1138,7 +1126,7 @@ def _check_crash(self, error, func, *args, ignore_stderr=False):
11381126
self.executor.shutdown(wait=True)
11391127

11401128
executor = self.executor_type(
1141-
max_workers=2, mp_context=get_context(self.ctx))
1129+
max_workers=2, mp_context=self.get_context())
11421130
res = executor.submit(func, *args)
11431131

11441132
if ignore_stderr:
@@ -1217,7 +1205,7 @@ def test_shutdown_deadlock(self):
12171205
# if a worker fails after the shutdown call.
12181206
self.executor.shutdown(wait=True)
12191207
with self.executor_type(max_workers=2,
1220-
mp_context=get_context(self.ctx)) as executor:
1208+
mp_context=self.get_context()) as executor:
12211209
self.executor = executor # Allow clean up in fail_on_deadlock
12221210
f = executor.submit(_crash, delay=.1)
12231211
executor.shutdown(wait=True)
@@ -1230,7 +1218,7 @@ def test_shutdown_deadlock_pickle(self):
12301218
# Reported in bpo-39104.
12311219
self.executor.shutdown(wait=True)
12321220
with self.executor_type(max_workers=2,
1233-
mp_context=get_context(self.ctx)) as executor:
1221+
mp_context=self.get_context()) as executor:
12341222
self.executor = executor # Allow clean up in fail_on_deadlock
12351223

12361224
# Start the executor and get the executor_manager_thread to collect
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix ``test_concurrent_futures`` to test the correct multiprocessing start method context in several cases where the test logic mixed this up.

0 commit comments

Comments
 (0)