Skip to content

Commit 7fa3a5a

Browse files
gh-91607: Fix several test_concurrent_futures tests to actually test what they claim (#91600)
* Fix test_concurrent_futures to actually test what it says. Many ProcessPoolExecutor based tests were ignoring the mp_context and using the default instead. This meant we lacked proper test coverage of all of them. Also removes the old _prime_executor() worker delay seeding code as it appears to have no point and causes 20-30 seconds extra latency on this already long test. It also interfered with some of the refactoring to fix the above to not needlessly create their own executor when setUp has already created an appropriate one. * Don't import the name from multiprocessing directly to avoid confusion. * 📜🤖 Added by blurb_it. Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
1 parent 0ddc63b commit 7fa3a5a

File tree

2 files changed

+50
-56
lines changed

2 files changed

+50
-56
lines changed

Lib/test/test_concurrent_futures.py

+49-56
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
2727
BrokenExecutor)
2828
from concurrent.futures.process import BrokenProcessPool, _check_system_limits
29-
from multiprocessing import get_context
3029

3130
import multiprocessing.process
3231
import multiprocessing.util
32+
import multiprocessing as mp
3333

3434

3535
if support.check_sanitizer(address=True, memory=True):
@@ -130,7 +130,6 @@ def setUp(self):
130130
self.executor = self.executor_type(
131131
max_workers=self.worker_count,
132132
**self.executor_kwargs)
133-
self._prime_executor()
134133

135134
def tearDown(self):
136135
self.executor.shutdown(wait=True)
@@ -144,15 +143,7 @@ def tearDown(self):
144143
super().tearDown()
145144

146145
def get_context(self):
147-
return get_context(self.ctx)
148-
149-
def _prime_executor(self):
150-
# Make sure that the executor is ready to do work before running the
151-
# tests. This should reduce the probability of timeouts in the tests.
152-
futures = [self.executor.submit(time.sleep, 0.1)
153-
for _ in range(self.worker_count)]
154-
for f in futures:
155-
f.result()
146+
return mp.get_context(self.ctx)
156147

157148

158149
class ThreadPoolMixin(ExecutorMixin):
@@ -275,9 +266,6 @@ def test_initializer(self):
275266
with self.assertRaises(BrokenExecutor):
276267
self.executor.submit(get_init_status)
277268

278-
def _prime_executor(self):
279-
pass
280-
281269
@contextlib.contextmanager
282270
def _assert_logged(self, msg):
283271
if self.log_queue is not None:
@@ -364,14 +352,14 @@ def test_hang_issue12364(self):
364352
f.result()
365353

366354
def test_cancel_futures(self):
367-
executor = self.executor_type(max_workers=3)
368-
fs = [executor.submit(time.sleep, .1) for _ in range(50)]
369-
executor.shutdown(cancel_futures=True)
355+
assert self.worker_count <= 5, "test needs few workers"
356+
fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
357+
self.executor.shutdown(cancel_futures=True)
370358
# We can't guarantee the exact number of cancellations, but we can
371-
# guarantee that *some* were cancelled. With setting max_workers to 3,
372-
# most of the submitted futures should have been cancelled.
359+
# guarantee that *some* were cancelled. With few workers, many of
360+
# the submitted futures should have been cancelled.
373361
cancelled = [fut for fut in fs if fut.cancelled()]
374-
self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
362+
self.assertGreater(len(cancelled), 20)
375363

376364
# Ensure the other futures were able to finish.
377365
# Use "not fut.cancelled()" instead of "fut.done()" to include futures
@@ -384,33 +372,32 @@ def test_cancel_futures(self):
384372
# Similar to the number of cancelled futures, we can't guarantee the
385373
# exact number that completed. But, we can guarantee that at least
386374
# one finished.
387-
self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
375+
self.assertGreater(len(others), 0)
388376

389-
def test_hang_issue39205(self):
377+
def test_hang_gh83386(self):
390378
"""shutdown(wait=False) doesn't hang at exit with running futures.
391379
392-
See https://bugs.python.org/issue39205.
380+
See https://github.com/python/cpython/issues/83386.
393381
"""
394382
if self.executor_type == futures.ProcessPoolExecutor:
395383
raise unittest.SkipTest(
396-
"Hangs due to https://bugs.python.org/issue39205")
384+
"Hangs, see https://github.com/python/cpython/issues/83386")
397385

398386
rc, out, err = assert_python_ok('-c', """if True:
399387
from concurrent.futures import {executor_type}
400388
from test.test_concurrent_futures import sleep_and_print
401389
if __name__ == "__main__":
390+
if {context!r}: multiprocessing.set_start_method({context!r})
402391
t = {executor_type}(max_workers=3)
403392
t.submit(sleep_and_print, 1.0, "apple")
404393
t.shutdown(wait=False)
405-
""".format(executor_type=self.executor_type.__name__))
394+
""".format(executor_type=self.executor_type.__name__,
395+
context=getattr(self, 'ctx', None)))
406396
self.assertFalse(err)
407397
self.assertEqual(out.strip(), b"apple")
408398

409399

410400
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
411-
def _prime_executor(self):
412-
pass
413-
414401
def test_threads_terminate(self):
415402
def acquire_lock(lock):
416403
lock.acquire()
@@ -505,14 +492,11 @@ def test_cancel_futures_wait_false(self):
505492

506493

507494
class ProcessPoolShutdownTest(ExecutorShutdownTest):
508-
def _prime_executor(self):
509-
pass
510-
511495
def test_processes_terminate(self):
512496
def acquire_lock(lock):
513497
lock.acquire()
514498

515-
mp_context = get_context()
499+
mp_context = self.get_context()
516500
sem = mp_context.Semaphore(0)
517501
for _ in range(3):
518502
self.executor.submit(acquire_lock, sem)
@@ -526,7 +510,8 @@ def acquire_lock(lock):
526510
p.join()
527511

528512
def test_context_manager_shutdown(self):
529-
with futures.ProcessPoolExecutor(max_workers=5) as e:
513+
with futures.ProcessPoolExecutor(
514+
max_workers=5, mp_context=self.get_context()) as e:
530515
processes = e._processes
531516
self.assertEqual(list(e.map(abs, range(-5, 5))),
532517
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
@@ -535,7 +520,8 @@ def test_context_manager_shutdown(self):
535520
p.join()
536521

537522
def test_del_shutdown(self):
538-
executor = futures.ProcessPoolExecutor(max_workers=5)
523+
executor = futures.ProcessPoolExecutor(
524+
max_workers=5, mp_context=self.get_context())
539525
res = executor.map(abs, range(-5, 5))
540526
executor_manager_thread = executor._executor_manager_thread
541527
processes = executor._processes
@@ -558,7 +544,8 @@ def test_del_shutdown(self):
558544
def test_shutdown_no_wait(self):
559545
# Ensure that the executor cleans up the processes when calling
560546
# shutdown with wait=False
561-
executor = futures.ProcessPoolExecutor(max_workers=5)
547+
executor = futures.ProcessPoolExecutor(
548+
max_workers=5, mp_context=self.get_context())
562549
res = executor.map(abs, range(-5, 5))
563550
processes = executor._processes
564551
call_queue = executor._call_queue
@@ -935,7 +922,7 @@ def submit(pool):
935922
pool.submit(submit, pool)
936923

937924
for _ in range(50):
938-
with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers:
925+
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
939926
workers.submit(tuple)
940927

941928

@@ -1005,7 +992,7 @@ def test_traceback(self):
1005992
def test_ressources_gced_in_workers(self):
1006993
# Ensure that argument for a job are correctly gc-ed after the job
1007994
# is finished
1008-
mgr = get_context(self.ctx).Manager()
995+
mgr = self.get_context().Manager()
1009996
obj = EventfulGCObj(mgr)
1010997
future = self.executor.submit(id, obj)
1011998
future.result()
@@ -1021,38 +1008,41 @@ def test_ressources_gced_in_workers(self):
10211008
mgr.join()
10221009

10231010
def test_saturation(self):
1024-
executor = self.executor_type(4)
1025-
mp_context = get_context()
1011+
executor = self.executor
1012+
mp_context = self.get_context()
10261013
sem = mp_context.Semaphore(0)
10271014
job_count = 15 * executor._max_workers
1028-
try:
1029-
for _ in range(job_count):
1030-
executor.submit(sem.acquire)
1031-
self.assertEqual(len(executor._processes), executor._max_workers)
1032-
for _ in range(job_count):
1033-
sem.release()
1034-
finally:
1035-
executor.shutdown()
1015+
for _ in range(job_count):
1016+
executor.submit(sem.acquire)
1017+
self.assertEqual(len(executor._processes), executor._max_workers)
1018+
for _ in range(job_count):
1019+
sem.release()
10361020

10371021
def test_idle_process_reuse_one(self):
1038-
executor = self.executor_type(4)
1022+
executor = self.executor
1023+
assert executor._max_workers >= 4
10391024
executor.submit(mul, 21, 2).result()
10401025
executor.submit(mul, 6, 7).result()
10411026
executor.submit(mul, 3, 14).result()
10421027
self.assertEqual(len(executor._processes), 1)
1043-
executor.shutdown()
10441028

10451029
def test_idle_process_reuse_multiple(self):
1046-
executor = self.executor_type(4)
1030+
executor = self.executor
1031+
assert executor._max_workers <= 5
10471032
executor.submit(mul, 12, 7).result()
10481033
executor.submit(mul, 33, 25)
10491034
executor.submit(mul, 25, 26).result()
10501035
executor.submit(mul, 18, 29)
1051-
self.assertLessEqual(len(executor._processes), 2)
1036+
executor.submit(mul, 1, 2).result()
1037+
executor.submit(mul, 0, 9)
1038+
self.assertLessEqual(len(executor._processes), 3)
10521039
executor.shutdown()
10531040

10541041
def test_max_tasks_per_child(self):
1055-
executor = self.executor_type(1, max_tasks_per_child=3)
1042+
# not using self.executor as we need to control construction.
1043+
# arguably this could go in another class w/o that mixin.
1044+
executor = self.executor_type(
1045+
1, mp_context=self.get_context(), max_tasks_per_child=3)
10561046
f1 = executor.submit(os.getpid)
10571047
original_pid = f1.result()
10581048
# The worker pid remains the same as the worker could be reused
@@ -1072,7 +1062,10 @@ def test_max_tasks_per_child(self):
10721062
executor.shutdown()
10731063

10741064
def test_max_tasks_early_shutdown(self):
1075-
executor = self.executor_type(3, max_tasks_per_child=1)
1065+
# not using self.executor as we need to control construction.
1066+
# arguably this could go in another class w/o that mixin.
1067+
executor = self.executor_type(
1068+
3, mp_context=self.get_context(), max_tasks_per_child=1)
10761069
futures = []
10771070
for i in range(6):
10781071
futures.append(executor.submit(mul, i, i))
@@ -1182,7 +1175,7 @@ def _check_crash(self, error, func, *args, ignore_stderr=False):
11821175
self.executor.shutdown(wait=True)
11831176

11841177
executor = self.executor_type(
1185-
max_workers=2, mp_context=get_context(self.ctx))
1178+
max_workers=2, mp_context=self.get_context())
11861179
res = executor.submit(func, *args)
11871180

11881181
if ignore_stderr:
@@ -1261,7 +1254,7 @@ def test_shutdown_deadlock(self):
12611254
# if a worker fails after the shutdown call.
12621255
self.executor.shutdown(wait=True)
12631256
with self.executor_type(max_workers=2,
1264-
mp_context=get_context(self.ctx)) as executor:
1257+
mp_context=self.get_context()) as executor:
12651258
self.executor = executor # Allow clean up in fail_on_deadlock
12661259
f = executor.submit(_crash, delay=.1)
12671260
executor.shutdown(wait=True)
@@ -1274,7 +1267,7 @@ def test_shutdown_deadlock_pickle(self):
12741267
# Reported in bpo-39104.
12751268
self.executor.shutdown(wait=True)
12761269
with self.executor_type(max_workers=2,
1277-
mp_context=get_context(self.ctx)) as executor:
1270+
mp_context=self.get_context()) as executor:
12781271
self.executor = executor # Allow clean up in fail_on_deadlock
12791272

12801273
# Start the executor and get the executor_manager_thread to collect
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix ``test_concurrent_futures`` to test the correct multiprocessing start method context in several cases where the test logic mixed this up.

0 commit comments

Comments
 (0)