diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index a14650bf5fa47c..2eff77f04fb921 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -518,7 +518,10 @@ def flag_executor_shutting_down(self): # to only have futures that are currently running. new_pending_work_items = {} for work_id, work_item in self.pending_work_items.items(): - if not work_item.future.cancel(): + if work_item.future.cancel(): + # gh-136655: ensure cancelled futures are notified + work_item.future.set_running_or_notify_cancel() + else: new_pending_work_items[work_id] = work_item self.pending_work_items = new_pending_work_items # Drain work_ids_queue since we no longer need to diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index a37c4d45f07b17..cbad521ea3cf81 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -39,6 +39,16 @@ def __len__(self): return 0 +class CancelNotifyTestException(Exception): + pass + + +def blocking_raiser(barrier=None): + if barrier is not None: + barrier.wait() + raise CancelNotifyTestException() + + class ExecutorTest: # Executor.shutdown() and context manager usage is tested by @@ -247,3 +257,33 @@ def test_swallows_falsey_exceptions(self): msg = 'lenlen' with self.assertRaisesRegex(FalseyLenException, msg): self.executor.submit(raiser, FalseyLenException, msg).result() + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_shutdown_notifies_cancelled_futures(self): + if self.worker_count < 2: + self.skipTest("test requires more than one worker") + + # TODO: remove when gh-109934 is fixed + if self.executor_type is futures.ThreadPoolExecutor: + self.skipTest("gh-109934: skipping thread pool executor") + + # gh-136655: ensure cancelled futures are notified + count = self.worker_count * 2 + barrier = self.create_barrier(self.worker_count + 1, timeout=1) + with self.executor as exec: + fs = [exec.submit(blocking_raiser, + barrier if index < self.worker_count else None) + for index in range(count)] + + exec.shutdown(wait=False, cancel_futures=True) + try: + barrier.wait() + except threading.BrokenBarrierError: + pass + + for future in fs: + self.assertRaises( + (CancelNotifyTestException, futures.CancelledError, threading.BrokenBarrierError), + future.result) + + self.assertIn('CANCELLED_AND_NOTIFIED', [f._state for f in fs]) diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py index 2a9e55152b82d5..1e3e350bef96c4 100644 --- a/Lib/test/test_concurrent_futures/util.py +++ b/Lib/test/test_concurrent_futures/util.py @@ -80,6 +80,9 @@ def get_context(self): class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor + def create_barrier(self, count, **kwargs): + return threading.Barrier(count, **kwargs) + def create_event(self): return threading.Event() @@ -88,6 +91,9 @@ def create_event(self): class InterpreterPoolMixin(ExecutorMixin): executor_type = futures.InterpreterPoolExecutor + def create_barrier(self, count, **kwargs): + self.skipTest("InterpreterPoolExecutor doesn't support barriers") + def create_event(self): self.skipTest("InterpreterPoolExecutor doesn't support events") @@ -107,6 +113,9 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() + def create_barrier(self, count, **kwargs): + return self.manager.Barrier(count, **kwargs) + def create_event(self): return self.manager.Event() @@ -122,6 +131,9 @@ def get_context(self): self.skipTest("ProcessPoolExecutor unavailable on this system") return super().get_context() + def create_barrier(self, count, **kwargs): + return self.manager.Barrier(count, **kwargs) + def create_event(self): return self.manager.Event() @@ -141,6 +153,9 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() + def create_barrier(self, count, **kwargs): + return self.manager.Barrier(count, **kwargs) + def create_event(self): return self.manager.Event() diff --git a/Misc/NEWS.d/next/Library/2025-10-13-13-05-15.gh-issue-136655.R8fBtC.rst b/Misc/NEWS.d/next/Library/2025-10-13-13-05-15.gh-issue-136655.R8fBtC.rst new file mode 100644 index 00000000000000..4a987a4f694987 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-10-13-13-05-15.gh-issue-136655.R8fBtC.rst @@ -0,0 +1,2 @@ ++Ensure :class:`concurrent.futures.ProcessPoolExecutor` notifies any futures +it cancels on shutdown.