Skip to content

Commit 9a45893

Browse files
authored
[3.10] gh-91607: Fix several test_concurrent_futures tests to actually test what they claim (GH-91600) (#91612)
* Fix test_concurrent_futures to actually test what it says. Many ProcessPoolExecutor based tests were ignoring the mp_context and using the default instead. This meant we lacked proper test coverage of all of them. Also removes the old _prime_executor() worker delay seeding code as it appears to have no point and causes 20-30 seconds extra latency on this already long test. It also interfered with some of the refactoring to fix the above to not needlessly create their own executor when setUp has already created an appropriate one. * Don't import the name from multiprocessing directly to avoid confusion. (cherry picked from commit 7fa3a5a) Co-authored-by: Gregory P. Smith <[email protected]>
1 parent 84c279b commit 9a45893

File tree

2 files changed

+43
-54
lines changed

2 files changed

+43
-54
lines changed

Lib/test/test_concurrent_futures.py

+42-54
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
2727
BrokenExecutor)
2828
from concurrent.futures.process import BrokenProcessPool, _check_system_limits
29-
from multiprocessing import get_context
3029

3130
import multiprocessing.process
3231
import multiprocessing.util
32+
import multiprocessing as mp
3333

3434

3535
if support.check_sanitizer(address=True, memory=True):
@@ -131,7 +131,6 @@ def setUp(self):
131131
self.executor = self.executor_type(
132132
max_workers=self.worker_count,
133133
**self.executor_kwargs)
134-
self._prime_executor()
135134

136135
def tearDown(self):
137136
self.executor.shutdown(wait=True)
@@ -145,15 +144,7 @@ def tearDown(self):
145144
super().tearDown()
146145

147146
def get_context(self):
148-
return get_context(self.ctx)
149-
150-
def _prime_executor(self):
151-
# Make sure that the executor is ready to do work before running the
152-
# tests. This should reduce the probability of timeouts in the tests.
153-
futures = [self.executor.submit(time.sleep, 0.1)
154-
for _ in range(self.worker_count)]
155-
for f in futures:
156-
f.result()
147+
return mp.get_context(self.ctx)
157148

158149

159150
class ThreadPoolMixin(ExecutorMixin):
@@ -276,9 +267,6 @@ def test_initializer(self):
276267
with self.assertRaises(BrokenExecutor):
277268
self.executor.submit(get_init_status)
278269

279-
def _prime_executor(self):
280-
pass
281-
282270
@contextlib.contextmanager
283271
def _assert_logged(self, msg):
284272
if self.log_queue is not None:
@@ -365,14 +353,14 @@ def test_hang_issue12364(self):
365353
f.result()
366354

367355
def test_cancel_futures(self):
368-
executor = self.executor_type(max_workers=3)
369-
fs = [executor.submit(time.sleep, .1) for _ in range(50)]
370-
executor.shutdown(cancel_futures=True)
356+
assert self.worker_count <= 5, "test needs few workers"
357+
fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
358+
self.executor.shutdown(cancel_futures=True)
371359
# We can't guarantee the exact number of cancellations, but we can
372-
# guarantee that *some* were cancelled. With setting max_workers to 3,
373-
# most of the submitted futures should have been cancelled.
360+
# guarantee that *some* were cancelled. With few workers, many of
361+
# the submitted futures should have been cancelled.
374362
cancelled = [fut for fut in fs if fut.cancelled()]
375-
self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
363+
self.assertGreater(len(cancelled), 20)
376364

377365
# Ensure the other futures were able to finish.
378366
# Use "not fut.cancelled()" instead of "fut.done()" to include futures
@@ -385,33 +373,32 @@ def test_cancel_futures(self):
385373
# Similar to the number of cancelled futures, we can't guarantee the
386374
# exact number that completed. But, we can guarantee that at least
387375
# one finished.
388-
self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
376+
self.assertGreater(len(others), 0)
389377

390-
def test_hang_issue39205(self):
378+
def test_hang_gh83386(self):
391379
"""shutdown(wait=False) doesn't hang at exit with running futures.
392380
393-
See https://bugs.python.org/issue39205.
381+
See https://github.com/python/cpython/issues/83386.
394382
"""
395383
if self.executor_type == futures.ProcessPoolExecutor:
396384
raise unittest.SkipTest(
397-
"Hangs due to https://bugs.python.org/issue39205")
385+
"Hangs, see https://github.com/python/cpython/issues/83386")
398386

399387
rc, out, err = assert_python_ok('-c', """if True:
400388
from concurrent.futures import {executor_type}
401389
from test.test_concurrent_futures import sleep_and_print
402390
if __name__ == "__main__":
391+
if {context!r}: multiprocessing.set_start_method({context!r})
403392
t = {executor_type}(max_workers=3)
404393
t.submit(sleep_and_print, 1.0, "apple")
405394
t.shutdown(wait=False)
406-
""".format(executor_type=self.executor_type.__name__))
395+
""".format(executor_type=self.executor_type.__name__,
396+
context=getattr(self, 'ctx', None)))
407397
self.assertFalse(err)
408398
self.assertEqual(out.strip(), b"apple")
409399

410400

411401
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
412-
def _prime_executor(self):
413-
pass
414-
415402
def test_threads_terminate(self):
416403
def acquire_lock(lock):
417404
lock.acquire()
@@ -506,14 +493,11 @@ def test_cancel_futures_wait_false(self):
506493

507494

508495
class ProcessPoolShutdownTest(ExecutorShutdownTest):
509-
def _prime_executor(self):
510-
pass
511-
512496
def test_processes_terminate(self):
513497
def acquire_lock(lock):
514498
lock.acquire()
515499

516-
mp_context = get_context()
500+
mp_context = self.get_context()
517501
sem = mp_context.Semaphore(0)
518502
for _ in range(3):
519503
self.executor.submit(acquire_lock, sem)
@@ -527,7 +511,8 @@ def acquire_lock(lock):
527511
p.join()
528512

529513
def test_context_manager_shutdown(self):
530-
with futures.ProcessPoolExecutor(max_workers=5) as e:
514+
with futures.ProcessPoolExecutor(
515+
max_workers=5, mp_context=self.get_context()) as e:
531516
processes = e._processes
532517
self.assertEqual(list(e.map(abs, range(-5, 5))),
533518
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
@@ -536,7 +521,8 @@ def test_context_manager_shutdown(self):
536521
p.join()
537522

538523
def test_del_shutdown(self):
539-
executor = futures.ProcessPoolExecutor(max_workers=5)
524+
executor = futures.ProcessPoolExecutor(
525+
max_workers=5, mp_context=self.get_context())
540526
res = executor.map(abs, range(-5, 5))
541527
executor_manager_thread = executor._executor_manager_thread
542528
processes = executor._processes
@@ -559,7 +545,8 @@ def test_del_shutdown(self):
559545
def test_shutdown_no_wait(self):
560546
# Ensure that the executor cleans up the processes when calling
561547
# shutdown with wait=False
562-
executor = futures.ProcessPoolExecutor(max_workers=5)
548+
executor = futures.ProcessPoolExecutor(
549+
max_workers=5, mp_context=self.get_context())
563550
res = executor.map(abs, range(-5, 5))
564551
processes = executor._processes
565552
call_queue = executor._call_queue
@@ -936,7 +923,7 @@ def submit(pool):
936923
pool.submit(submit, pool)
937924

938925
for _ in range(50):
939-
with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers:
926+
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
940927
workers.submit(tuple)
941928

942929

@@ -1006,7 +993,7 @@ def test_traceback(self):
1006993
def test_ressources_gced_in_workers(self):
1007994
# Ensure that argument for a job are correctly gc-ed after the job
1008995
# is finished
1009-
mgr = get_context(self.ctx).Manager()
996+
mgr = self.get_context().Manager()
1010997
obj = EventfulGCObj(mgr)
1011998
future = self.executor.submit(id, obj)
1012999
future.result()
@@ -1022,36 +1009,37 @@ def test_ressources_gced_in_workers(self):
10221009
mgr.join()
10231010

10241011
def test_saturation(self):
1025-
executor = self.executor_type(4)
1026-
mp_context = get_context()
1012+
executor = self.executor
1013+
mp_context = self.get_context()
10271014
sem = mp_context.Semaphore(0)
10281015
job_count = 15 * executor._max_workers
1029-
try:
1030-
for _ in range(job_count):
1031-
executor.submit(sem.acquire)
1032-
self.assertEqual(len(executor._processes), executor._max_workers)
1033-
for _ in range(job_count):
1034-
sem.release()
1035-
finally:
1036-
executor.shutdown()
1016+
for _ in range(job_count):
1017+
executor.submit(sem.acquire)
1018+
self.assertEqual(len(executor._processes), executor._max_workers)
1019+
for _ in range(job_count):
1020+
sem.release()
10371021

10381022
def test_idle_process_reuse_one(self):
1039-
executor = self.executor_type(4)
1023+
executor = self.executor
1024+
assert executor._max_workers >= 4
10401025
executor.submit(mul, 21, 2).result()
10411026
executor.submit(mul, 6, 7).result()
10421027
executor.submit(mul, 3, 14).result()
10431028
self.assertEqual(len(executor._processes), 1)
1044-
executor.shutdown()
10451029

10461030
def test_idle_process_reuse_multiple(self):
1047-
executor = self.executor_type(4)
1031+
executor = self.executor
1032+
assert executor._max_workers <= 5
10481033
executor.submit(mul, 12, 7).result()
10491034
executor.submit(mul, 33, 25)
10501035
executor.submit(mul, 25, 26).result()
10511036
executor.submit(mul, 18, 29)
1052-
self.assertLessEqual(len(executor._processes), 2)
1037+
executor.submit(mul, 1, 2).result()
1038+
executor.submit(mul, 0, 9)
1039+
self.assertLessEqual(len(executor._processes), 3)
10531040
executor.shutdown()
10541041

1042+
10551043
create_executor_tests(ProcessPoolExecutorTest,
10561044
executor_mixins=(ProcessPoolForkMixin,
10571045
ProcessPoolForkserverMixin,
@@ -1153,7 +1141,7 @@ def _check_crash(self, error, func, *args, ignore_stderr=False):
11531141
self.executor.shutdown(wait=True)
11541142

11551143
executor = self.executor_type(
1156-
max_workers=2, mp_context=get_context(self.ctx))
1144+
max_workers=2, mp_context=self.get_context())
11571145
res = executor.submit(func, *args)
11581146

11591147
if ignore_stderr:
@@ -1232,7 +1220,7 @@ def test_shutdown_deadlock(self):
12321220
# if a worker fails after the shutdown call.
12331221
self.executor.shutdown(wait=True)
12341222
with self.executor_type(max_workers=2,
1235-
mp_context=get_context(self.ctx)) as executor:
1223+
mp_context=self.get_context()) as executor:
12361224
self.executor = executor # Allow clean up in fail_on_deadlock
12371225
f = executor.submit(_crash, delay=.1)
12381226
executor.shutdown(wait=True)
@@ -1245,7 +1233,7 @@ def test_shutdown_deadlock_pickle(self):
12451233
# Reported in bpo-39104.
12461234
self.executor.shutdown(wait=True)
12471235
with self.executor_type(max_workers=2,
1248-
mp_context=get_context(self.ctx)) as executor:
1236+
mp_context=self.get_context()) as executor:
12491237
self.executor = executor # Allow clean up in fail_on_deadlock
12501238

12511239
# Start the executor and get the executor_manager_thread to collect
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix ``test_concurrent_futures`` to test the correct multiprocessing start method context in several cases where the test logic mixed this up.

0 commit comments

Comments
 (0)