From afa2c7d6b4d3c89ac5326d73b3b4277f25a95324 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Sat, 18 Feb 2017 01:59:42 -0500 Subject: [PATCH 01/15] TST no deadlocks with pickling crashes - TST crash in CallItem unpickling - TST crash in func call run - TST crash in result pickling the test include crashes with PythonError/SystemExist/SegFault Also add more tests for race condition when a worker crashes --- Lib/concurrent/futures/process.py | 5 +- Lib/test/test_concurrent_futures.py | 234 ++++++++++++++++++++++++++++ 2 files changed, 238 insertions(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 35af65d0beeea6..c1f72dd24a891b 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -264,7 +264,10 @@ def shutdown_worker(): # This is an upper bound nb_children_alive = sum(p.is_alive() for p in processes.values()) for i in range(0, nb_children_alive): - call_queue.put_nowait(None) + try: + call_queue.put_nowait(None) + except Full: + pass # Release the queue's resources as soon as possible. call_queue.close() # If .join() is not called on the created processes then diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 296398f0d948e4..e2fbd33f4873d9 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -759,6 +759,240 @@ def test_ressources_gced_in_workers(self): ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)) +def hide_process_stderr(): + import io + setattr(sys, "stderr", io.StringIO()) + + +def _crash(): + """Induces a segfault""" + import faulthandler + faulthandler.disable() + faulthandler._sigsegv() + + +def _exit(): + """Induces a sys exit with exitcode 1""" + sys.exit(1) + + +def _raise_error(Err): + """Function that raises an Exception in process""" + hide_process_stderr() + raise Err() + + +def _return_instance(cls): + """Function that returns a instance of cls""" + hide_process_stderr() + return cls() + + +class CrashAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + _crash() + + +class CrashAtUnpickle(object): + """Bad object that triggers a segfault at unpickling time.""" + def __reduce__(self): + return _crash, () + + +class ExitAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + _exit() + + +class ExitAtUnpickle(object): + """Bad object that triggers a process exit at unpickling time.""" + def __reduce__(self): + return _exit, () + + +class ErrorAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + from pickle import PicklingError + raise PicklingError("Error in pickle") + + +class ErrorAtUnpickle(object): + """Bad object that triggers a process exit at unpickling time.""" + def __reduce__(self): + from pickle import UnpicklingError + return _raise_error, (UnpicklingError, ) + + +class TimingWrapper(object): + """Creates a wrapper for a function which records the time it takes to + finish + """ + + def __init__(self, func): + self.func = func + self.elapsed = None + + def __call__(self, *args, **kwds): + t = time.time() + try: + return self.func(*args, **kwds) + finally: + self.elapsed = time.time() - t + + +class ExecutorDeadlockTest: + # If ExecutorDeadlockTest takes more than 100secs to complete, it is very + # likely caught in a deadlock. As there is no easy way to detect it, + # faulthandler will print the traceback and exit. + TIMEOUT = 15 + + @classmethod + def _sleep_id(cls, x, delay): + time.sleep(delay) + return x + + def _fail_on_deadlock(self, executor): + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + import faulthandler + from tempfile import TemporaryFile + with TemporaryFile(mode="w+") as f: + faulthandler.dump_traceback(file=f) + f.seek(0) + tb = f.read() + executor.shutdown(wait=True, kill_workers=True) + print(f"\nTraceback:\n {tb}", file=sys.__stderr__) + self.fail(f"Deadlock executor:\n\n{tb}") + + def test_crash(self): + # extensive testing for deadlock caused by crash in a pool + crash_cases = [ + # Check problem occuring while unpickling a task on workers + (id, (ExitAtUnpickle(),), BrokenProcessPool, + "exit at task unpickle"), + (id, (ErrorAtUnpickle(),), BrokenProcessPool, + "error at task unpickle"), + (id, (CrashAtUnpickle(),), BrokenProcessPool, + "crash at task unpickle"), + # Check problem occuring during func execution on workers + (_crash, (), BrokenProcessPool, + "crash during func execution on worker"), + (_exit, (), SystemExit, + "exit during func execution on worker"), + (_raise_error, (RuntimeError, ), RuntimeError, + "error during func execution on worker"), + # Check problem occuring while pickling a task result + # on workers + (_return_instance, (CrashAtPickle,), BrokenProcessPool, + "crash during result pickle on worker"), + (_return_instance, (ExitAtPickle,), BrokenProcessPool, + "exit during result pickle on worker"), + (_return_instance, (ErrorAtPickle,), BrokenProcessPool, + "error during result pickle on worker"), + ] + for func, args, error, name in crash_cases: + with self.subTest(name): + # skip the test involving pickle errors with manager as it + # breaks the manager and not the pool in this cases + # skip the test involving pickle errors with thread as the + # tasks and results are not pickled in this case + with test.support.captured_stderr(): + executor = self.executor_type( + max_workers=2, mp_context=get_context(self.ctx)) + res = executor.submit(func, *args) + with self.assertRaises(error): + try: + res.result(timeout=self.TIMEOUT) + except futures.TimeoutError: + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + self._fail_on_deadlock(executor) + executor.shutdown(wait=True) + + @classmethod + def _test_getpid(cls, a): + return os.getpid() + + @classmethod + def _test_kill_worker(cls, pid=None, delay=0.01): + """Function that send SIGKILL at process pid after delay second""" + time.sleep(delay) + if pid is None: + pid = os.getpid() + try: + from signal import SIGKILL + except ImportError: + from signal import SIGTERM as SIGKILL + try: + os.kill(pid, SIGKILL) + time.sleep(.01) + except (ProcessLookupError, PermissionError): + pass + + def test_crash_races(self): + + from itertools import repeat + for n_proc in [1, 2, 5, 17]: + with self.subTest(n_proc=n_proc): + # Test for external crash signal comming from neighbor + # with various race setup + executor = self.executor_type( + max_workers=2, mp_context=get_context(self.ctx)) + try: + raise AttributeError() + pids = [p.pid for p in executor._processes] + assert len(pids) == n_proc + except AttributeError: + pids = [pid for pid in executor.map( + self._test_getpid, [None] * n_proc)] + assert None not in pids + res = self.executor.map( + self._sleep_id, repeat(True, 2 * n_proc), + [.001 * (j // 2) for j in range(2 * n_proc)], + chunksize=1) + assert all(res) + res = executor.map(self._test_kill_worker, pids[::-1], + timeout=self.TIMEOUT) + with self.assertRaises(BrokenProcessPool): + try: + [v for v in res] + except futures.TimeoutError: + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + self._fail_on_deadlock(executor) + executor.shutdown(wait=True) + + def test_shutdown_deadlock(self): + # Test that the pool calling shutdown do not cause deadlock + # if a worker failed + + with self.executor_type(max_workers=2, + mp_context=get_context(self.ctx)) as executor: + executor.submit(self._test_kill_worker, ()) + time.sleep(.01) + executor.shutdown() + + +class ProcessPoolForkExecutorDeadlockTest(ProcessPoolForkMixin, + ExecutorDeadlockTest, + unittest.TestCase): + pass + + +class ProcessPoolForkserverExecutorDeadlockTest(ProcessPoolForkserverMixin, + ExecutorDeadlockTest, + unittest.TestCase): + pass + + +class ProcessPoolSpawnExecutorDeadlockTest(ProcessPoolSpawnMixin, + ExecutorDeadlockTest, + unittest.TestCase): + pass + class FutureTests(BaseTestCase): def test_done_callback_with_result(self): From f99ce5e453755cbcf40138056b84f9b36abbf761 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Sat, 18 Feb 2017 07:29:37 -0500 Subject: [PATCH 02/15] ENH don't fail on result pickling errors --- Lib/concurrent/futures/process.py | 93 ++++++++++++++++++++++------- Lib/multiprocessing/queues.py | 21 +++++-- Lib/test/_test_multiprocessing.py | 36 +++++++++++ Lib/test/test_concurrent_futures.py | 24 ++++++-- 4 files changed, 142 insertions(+), 32 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index c1f72dd24a891b..17ca291f1e69ec 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -8,10 +8,10 @@ |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | +| | => | Work Ids | | | | Call Q | | Process | +| | +----------+ | | +-----------+ | Pool | +| | | ... | | | | ... | +---------+ +| | | 6 | => | | => | 5, call() | => | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | @@ -52,6 +52,7 @@ from queue import Full import multiprocessing as mp from multiprocessing.connection import wait +from multiprocessing.queues import Queue import threading import weakref from functools import partial @@ -90,6 +91,7 @@ def _python_exit(): # (Futures in the call queue cannot be cancelled). EXTRA_QUEUED_CALLS = 1 + # Hack to embed stringification of remote traceback in local traceback class _RemoteTraceback(Exception): @@ -132,6 +134,25 @@ def __init__(self, work_id, fn, args, kwargs): self.kwargs = kwargs +class _SafeQueue(Queue): + """Safe Queue set exception to the future object linked to a job""" + def __init__(self, max_size=0, *, ctx, pending_work_items): + self.pending_work_items = pending_work_items + super().__init__(max_size, ctx=ctx) + + def _on_queue_feeder_error(self, e, obj): + if isinstance(obj, _CallItem): + tb = traceback.format_exception(type(e), e, e.__traceback__) + e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) + work_item = self.pending_work_items.pop(obj.work_id, None) + # work_item can be None if another process terminated (see above) + if work_item is not None: + work_item.future.set_exception(e) + del work_item + else: + super()._on_queue_feeder_error(e, obj) + + def _get_chunks(*iterables, chunksize): """ Iterates over zip()ed iterables in chunks. """ it = zip(*iterables) @@ -152,6 +173,17 @@ def _process_chunk(fn, chunk): """ return [fn(*args) for args in chunk] + +def _sendback_result(result_queue, work_id, result=None, exception=None): + """Safely send back the given result or exception""" + try: + result_queue.put(_ResultItem(work_id, result=result, + exception=exception)) + except BaseException as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + result_queue.put(_ResultItem(work_id, exception=exc)) + + def _process_worker(call_queue, result_queue, initializer, initargs): """Evaluates calls from call_queue and places the results in result_queue. @@ -183,10 +215,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs): r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - result_queue.put(_ResultItem(call_item.work_id, exception=exc)) + _sendback_result(result_queue, call_item.work_id, exception=exc) else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) + _sendback_result(result_queue, call_item.work_id, result=r) # Liberate the resource as soon as possible, to avoid holding onto # open files or shared memory that is not needed anymore @@ -285,9 +316,15 @@ def shutdown_worker(): sentinels = [p.sentinel for p in processes.values()] assert sentinels ready = wait([reader] + sentinels) + + received_item = False if reader in ready: - result_item = reader.recv() - else: + try: + result_item = reader.recv() + received_item = True + except: + pass + if not received_item: # Mark the process pool broken so that submits fail right now. executor = executor_reference() if executor is not None: @@ -332,6 +369,9 @@ def shutdown_worker(): work_item.future.set_result(result_item.result) # Delete references to object. See issue16284 del work_item + # Delete reference to result_item + del result_item + # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: @@ -351,8 +391,11 @@ def shutdown_worker(): pass executor = None + _system_limits_checked = False _system_limited = None + + def _check_system_limits(): global _system_limits_checked, _system_limited if _system_limits_checked: @@ -418,6 +461,7 @@ def __init__(self, max_workers=None, mp_context=None, raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers + if mp_context is None: mp_context = mp.get_context() self._mp_context = mp_context @@ -427,18 +471,9 @@ def __init__(self, max_workers=None, mp_context=None, self._initializer = initializer self._initargs = initargs - # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from idling. But don't make it too big - # because futures in the call queue cannot be cancelled. - queue_size = self._max_workers + EXTRA_QUEUED_CALLS - self._call_queue = mp_context.Queue(queue_size) - # Killed worker processes can produce spurious "broken pipe" - # tracebacks in the queue's own worker thread. But we detect killed - # processes anyway, so silence the tracebacks. - self._call_queue._ignore_epipe = True - self._result_queue = mp_context.SimpleQueue() - self._work_ids = queue.Queue() + # Management threads self._queue_management_thread = None + # Map of pids to processes self._processes = {} @@ -449,6 +484,21 @@ def __init__(self, max_workers=None, mp_context=None, self._queue_count = 0 self._pending_work_items = {} + # Create communication channels for the executor + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. + queue_size = self._max_workers + EXTRA_QUEUED_CALLS + self._call_queue = _SafeQueue( + max_size=queue_size, ctx=self._mp_context, + pending_work_items=self._pending_work_items) + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True + self._result_queue = mp_context.SimpleQueue() + self._work_ids = queue.Queue() + def _start_queue_management_thread(self): # When the executor gets lost, the weakref callback will wake up # the queue management thread. @@ -464,7 +514,8 @@ def weakref_cb(_, q=self._result_queue): self._pending_work_items, self._work_ids, self._call_queue, - self._result_queue)) + self._result_queue), + name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() _threads_queues[self._queue_management_thread] = self._result_queue diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 328efbd95fe63d..d66d37a5c3e2eb 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -160,9 +160,10 @@ def _start_thread(self): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, - self._wlock, self._writer.close, self._ignore_epipe), + self._wlock, self._writer.close, self._ignore_epipe, + self._on_queue_feeder_error), name='QueueFeederThread' - ) + ) self._thread.daemon = True debug('doing self._thread.start()') @@ -201,7 +202,8 @@ def _finalize_close(buffer, notempty): notempty.notify() @staticmethod - def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): + def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, + onerror): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -253,8 +255,17 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): info('error in queue thread: %s', e) return else: - import traceback - traceback.print_exc() + onerror(e, obj) + + @staticmethod + def _on_queue_feeder_error(e, obj): + """ + Private API hook called when feeding data in the background thread + raises an exception. For overriding by concurrent.futures. + """ + import traceback + traceback.print_exc() + _sentinel = object() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index dbca2d89ed14e4..b2e96a51141d58 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1029,6 +1029,42 @@ def __reduce__(self): self.assertTrue(q.get(timeout=1.0)) close_queue(q) + def test_queue_feeder_on_queue_feeder_error(self): + # bpo-30006: verify feeder handles exceptions using the + # _on_queue_feeder_error hook. + if self.TYPE != 'processes': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + class NotSerializable(object): + """Mock unserializable object""" + def __init__(self): + self.reduce_was_called = False + self.on_queue_feeder_error_was_called = False + + def __reduce__(self): + self.reduce_was_called = True + raise AttributeError + + class SafeQueue(multiprocessing.queues.Queue): + """Queue with overloaded _on_queue_feeder_error hook""" + @staticmethod + def _on_queue_feeder_error(e, obj): + if (isinstance(e, AttributeError) and + isinstance(obj, NotSerializable)): + obj.on_queue_feeder_error_was_called = True + + not_serializable_obj = NotSerializable() + with test.support.captured_stderr(): + q = SafeQueue(ctx=multiprocessing.get_context()) + q.put(not_serializable_obj) + + # Verify that q is still functionning correctly + q.put(True) + self.assertTrue(q.get(timeout=1.0)) + + # Assert that the serialization and the hook have been called correctly + self.assertTrue(not_serializable_obj.reduce_was_called) + self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) # # # diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index e2fbd33f4873d9..42ef43d39cdd90 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -15,6 +15,7 @@ import time import unittest import weakref +from pickle import PicklingError from concurrent import futures from concurrent.futures._base import ( @@ -369,12 +370,15 @@ def test_del_shutdown(self): queue_management_thread = executor._queue_management_thread processes = executor._processes call_queue = executor._call_queue + queue_management_thread = executor._queue_management_thread del executor queue_management_thread.join() for p in processes.values(): p.join() - call_queue.close() + # Make sure that the queue management thread was properly finished + # and the queue was closed by the shutdown process + queue_management_thread.join() call_queue.join_thread() @@ -830,7 +834,6 @@ class TimingWrapper(object): """Creates a wrapper for a function which records the time it takes to finish """ - def __init__(self, func): self.func = func self.elapsed = None @@ -863,13 +866,18 @@ def _fail_on_deadlock(self, executor): faulthandler.dump_traceback(file=f) f.seek(0) tb = f.read() - executor.shutdown(wait=True, kill_workers=True) + for p in executor._processes.values(): + p.terminate() + executor.shutdown(wait=True) print(f"\nTraceback:\n {tb}", file=sys.__stderr__) self.fail(f"Deadlock executor:\n\n{tb}") def test_crash(self): # extensive testing for deadlock caused by crash in a pool crash_cases = [ + # Check problem occuring while pickling a task in + # the task_handler thread + (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"), # Check problem occuring while unpickling a task on workers (id, (ExitAtUnpickle(),), BrokenProcessPool, "exit at task unpickle"), @@ -888,10 +896,14 @@ def test_crash(self): # on workers (_return_instance, (CrashAtPickle,), BrokenProcessPool, "crash during result pickle on worker"), - (_return_instance, (ExitAtPickle,), BrokenProcessPool, + (_return_instance, (ExitAtPickle,), SystemExit, "exit during result pickle on worker"), - (_return_instance, (ErrorAtPickle,), BrokenProcessPool, + (_return_instance, (ErrorAtPickle,), PicklingError, "error during result pickle on worker"), + # Check problem occuring while unpickling a task in + # the result_handler thread + (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool, + "error during result unpickle in result_handler") ] for func, args, error, name in crash_cases: with self.subTest(name): @@ -973,7 +985,7 @@ def test_shutdown_deadlock(self): mp_context=get_context(self.ctx)) as executor: executor.submit(self._test_kill_worker, ()) time.sleep(.01) - executor.shutdown() + executor.shutdown(wait=True) class ProcessPoolForkExecutorDeadlockTest(ProcessPoolForkMixin, From 5a27c119f70d34b6ea078e57c7c3d5fb75218906 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Thu, 5 Oct 2017 11:06:58 +0200 Subject: [PATCH 03/15] DOC add NEWS entry --- .../next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst diff --git a/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst b/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst new file mode 100644 index 00000000000000..d5c4c6eee50f9b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst @@ -0,0 +1,3 @@ +Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor`, caused by +pickling or unpickling errors. This should make sure that calls to the +:class:`ProcessPoolExecutor` API always return. From 04e1dc11a1aa9e547ba4b4b38e6c20341b7b5994 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 3 Nov 2017 13:44:16 +0100 Subject: [PATCH 04/15] ENH independent com channel for wakeup to avoid deadlocks in shutdown --- Lib/concurrent/futures/process.py | 108 +++++++++++++++++++++------- Lib/test/test_concurrent_futures.py | 20 ++---- 2 files changed, 85 insertions(+), 43 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 17ca291f1e69ec..d1ec982cbbfa13 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -76,12 +76,36 @@ _threads_queues = weakref.WeakKeyDictionary() _global_shutdown = False + +# This constants control the maximal wakeup. If a job is submitted to the +# Executor, it might take up to _POLL_TIMEOUT for the executor to notice and +# start launching the job. This _POLL_TIMEOUT is to be cumulated with the +# communication overhead. +_POLL_TIMEOUT = .001 + + +class _Sentinel: + __slot__ = ["_state"] + + def __init__(self): + self._state = False + + def set(self): + self._state = True + + def get_and_unset(self): + s = self._state + if s: + self._state = False + return s + + def _python_exit(): global _global_shutdown _global_shutdown = True items = list(_threads_queues.items()) - for t, q in items: - q.put(None) + for t, wakeup in items: + wakeup.set() for t, q in items: t.join() @@ -266,7 +290,8 @@ def _queue_management_worker(executor_reference, pending_work_items, work_ids_queue, call_queue, - result_queue): + result_queue, + wakeup): """Manages the communication between this process and the worker processes. This function is run in a local thread. @@ -284,6 +309,8 @@ def _queue_management_worker(executor_reference, derived from _WorkItems for processing by the process workers. result_queue: A ctx.SimpleQueue of _ResultItems generated by the process workers. + wakeup: A _Sentinel to allow waking up the queue_manager_thread from + the main Thread and avoid deadlocks caused by broken queues. """ executor = None @@ -292,13 +319,21 @@ def shutting_down(): or executor._shutdown_thread) def shutdown_worker(): - # This is an upper bound - nb_children_alive = sum(p.is_alive() for p in processes.values()) - for i in range(0, nb_children_alive): - try: - call_queue.put_nowait(None) - except Full: - pass + # This is an upper bound on the number of children alive. + n_children_alive = sum(p.is_alive() for p in processes.values()) + n_children_to_stop = n_children_alive + n_sentinels_sent = 0 + # Sent the right number of sentinels, to make sure all children are + # properly terminated. + while n_sentinels_sent < n_children_to_stop and n_children_alive > 0: + for i in range(n_children_to_stop - n_sentinels_sent): + try: + call_queue.put_nowait(None) + n_sentinels_sent += 1 + except Full: + break + n_children_alive = sum(p.is_alive() for p in processes.values()) + # Release the queue's resources as soon as possible. call_queue.close() # If .join() is not called on the created processes then @@ -306,24 +341,34 @@ def shutdown_worker(): for p in processes.values(): p.join() - reader = result_queue._reader + result_reader = result_queue._reader while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) - sentinels = [p.sentinel for p in processes.values()] - assert sentinels - ready = wait([reader] + sentinels) - + # Wait for a result to be ready in the result_queue while checking + # that worker process are still running. + worker_sentinels = [p.sentinel for p in processes.values()] received_item = False - if reader in ready: + while not wakeup.get_and_unset(): + ready = wait([result_reader] + worker_sentinels, + timeout=_POLL_TIMEOUT) + if len(ready) > 0: + break + else: + # The thread has been woken up by the main thread or the gc. + ready = [] + result_item = None + received_item = True + + if result_reader in ready: try: - result_item = reader.recv() + result_item = result_reader.recv() received_item = True - except: - pass + except BaseException as e: + traceback.print_exc() if not received_item: # Mark the process pool broken so that submits fail right now. executor = executor_reference() @@ -499,12 +544,20 @@ def __init__(self, max_workers=None, mp_context=None, self._result_queue = mp_context.SimpleQueue() self._work_ids = queue.Queue() + # Permits to wake_up the queue_manager_thread independently of + # result_queue state. This avoid deadlocks caused by the non + # transmission of wakeup signal when a worker died with the + # _result_queue write lock. + self._wakeup = _Sentinel() + def _start_queue_management_thread(self): - # When the executor gets lost, the weakref callback will wake up - # the queue management thread. - def weakref_cb(_, q=self._result_queue): - q.put(None) if self._queue_management_thread is None: + # When the executor gets lost, the weakref callback will wake up + # the queue management thread. + def weakref_cb(_, wakeup=self._wakeup): + mp.util.debug('Executor collected: triggering callback for' + ' QueueManager wakeup') + wakeup.set() # Start the processes so that their sentinels are known. self._adjust_process_count() self._queue_management_thread = threading.Thread( @@ -514,11 +567,12 @@ def weakref_cb(_, q=self._result_queue): self._pending_work_items, self._work_ids, self._call_queue, - self._result_queue), + self._result_queue, + self._wakeup), name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() - _threads_queues[self._queue_management_thread] = self._result_queue + _threads_queues[self._queue_management_thread] = self._wakeup def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): @@ -545,7 +599,7 @@ def submit(self, fn, *args, **kwargs): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._result_queue.put(None) + self._wakeup.set() self._start_queue_management_thread() return f @@ -585,7 +639,7 @@ def shutdown(self, wait=True): self._shutdown_thread = True if self._queue_management_thread: # Wake up queue management thread - self._result_queue.put(None) + self._wakeup.set() if wait: self._queue_management_thread.join() # To reduce the risk of opening too many files, remove references to diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 42ef43d39cdd90..64db83eb1062c3 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -988,22 +988,10 @@ def test_shutdown_deadlock(self): executor.shutdown(wait=True) -class ProcessPoolForkExecutorDeadlockTest(ProcessPoolForkMixin, - ExecutorDeadlockTest, - unittest.TestCase): - pass - - -class ProcessPoolForkserverExecutorDeadlockTest(ProcessPoolForkserverMixin, - ExecutorDeadlockTest, - unittest.TestCase): - pass - - -class ProcessPoolSpawnExecutorDeadlockTest(ProcessPoolSpawnMixin, - ExecutorDeadlockTest, - unittest.TestCase): - pass +create_executor_tests(ExecutorDeadlockTest, + executor_mixins=(ProcessPoolForkMixin, + ProcessPoolForkserverMixin, + ProcessPoolSpawnMixin)) class FutureTests(BaseTestCase): From b43db525548f9ae281a4b706cd6a1f763c0f4359 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Tue, 21 Nov 2017 17:24:08 +0100 Subject: [PATCH 05/15] CLN correct some typos --- Lib/concurrent/futures/process.py | 24 +++++++++++++----------- Lib/test/test_concurrent_futures.py | 2 +- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index d1ec982cbbfa13..ecb7fd06866385 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -73,11 +73,11 @@ # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_threads_queues = weakref.WeakKeyDictionary() +_threads_sentinels = weakref.WeakKeyDictionary() _global_shutdown = False -# This constants control the maximal wakeup. If a job is submitted to the +# This constant controls the maximal wakeup. If a job is submitted to the # Executor, it might take up to _POLL_TIMEOUT for the executor to notice and # start launching the job. This _POLL_TIMEOUT is to be cumulated with the # communication overhead. @@ -103,7 +103,7 @@ def get_and_unset(self): def _python_exit(): global _global_shutdown _global_shutdown = True - items = list(_threads_queues.items()) + items = list(_threads_sentinels.items()) for t, wakeup in items: wakeup.set() for t, q in items: @@ -349,7 +349,7 @@ def shutdown_worker(): call_queue) # Wait for a result to be ready in the result_queue while checking - # that worker process are still running. + # that all worker processes are still running. worker_sentinels = [p.sentinel for p in processes.values()] received_item = False while not wakeup.get_and_unset(): @@ -460,7 +460,8 @@ def _check_system_limits(): # minimum number of semaphores available # according to POSIX return - _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + _system_limited = ("system provides too few semaphores (%d" + " available, 256 necessary)" % nsems_max) raise NotImplementedError(_system_limited) @@ -548,13 +549,13 @@ def __init__(self, max_workers=None, mp_context=None, # result_queue state. This avoid deadlocks caused by the non # transmission of wakeup signal when a worker died with the # _result_queue write lock. - self._wakeup = _Sentinel() + self._queue_management_thread_sentinel = _Sentinel() def _start_queue_management_thread(self): if self._queue_management_thread is None: # When the executor gets lost, the weakref callback will wake up # the queue management thread. - def weakref_cb(_, wakeup=self._wakeup): + def weakref_cb(_, wakeup=self._queue_management_thread_sentinel): mp.util.debug('Executor collected: triggering callback for' ' QueueManager wakeup') wakeup.set() @@ -568,11 +569,12 @@ def weakref_cb(_, wakeup=self._wakeup): self._work_ids, self._call_queue, self._result_queue, - self._wakeup), + self._queue_management_thread_sentinel), name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() - _threads_queues[self._queue_management_thread] = self._wakeup + _threads_sentinels[self._queue_management_thread] = \ + self._queue_management_thread_sentinel def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): @@ -599,7 +601,7 @@ def submit(self, fn, *args, **kwargs): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._wakeup.set() + self._queue_management_thread_sentinel.set() self._start_queue_management_thread() return f @@ -639,7 +641,7 @@ def shutdown(self, wait=True): self._shutdown_thread = True if self._queue_management_thread: # Wake up queue management thread - self._wakeup.set() + self._queue_management_thread_sentinel.set() if wait: self._queue_management_thread.join() # To reduce the risk of opening too many files, remove references to diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 64db83eb1062c3..2cef62fd9a5ce6 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -870,7 +870,7 @@ def _fail_on_deadlock(self, executor): p.terminate() executor.shutdown(wait=True) print(f"\nTraceback:\n {tb}", file=sys.__stderr__) - self.fail(f"Deadlock executor:\n\n{tb}") + self.fail(f"Executor deadlock:\n\n{tb}") def test_crash(self): # extensive testing for deadlock caused by crash in a pool From f687bf3bb264b543cc0f4341c65fe3c7c0d492f4 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Wed, 22 Nov 2017 17:01:45 +0100 Subject: [PATCH 06/15] CLN improve test_crash_races --- Lib/test/test_concurrent_futures.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 2cef62fd9a5ce6..941ca369df4415 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -938,11 +938,13 @@ def _test_kill_worker(cls, pid=None, delay=0.01): from signal import SIGKILL except ImportError: from signal import SIGTERM as SIGKILL + # Try to kill a process in the pool, if it is not finished yet try: os.kill(pid, SIGKILL) - time.sleep(.01) except (ProcessLookupError, PermissionError): pass + # Give some time for the Executor to detect the failure + time.sleep(.5) def test_crash_races(self): From 5bb0bd2442cdc293c85d10bb8e5282a5419de402 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Sat, 25 Nov 2017 13:26:46 +0100 Subject: [PATCH 07/15] ENH remove polling in queue_management_thread - Use a to communicate between MainThread and QueueManager as it is waitable and avoid polling in the thread. - Add timeout on ExecutorTest to avoid deadlocks freezing the test suite. - Rename the object to . --- Lib/concurrent/futures/process.py | 77 +++++++++++++---------------- Lib/test/test_concurrent_futures.py | 69 +++++++++++--------------- 2 files changed, 63 insertions(+), 83 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index ecb7fd06866385..1ef489b4e48c17 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -73,39 +73,30 @@ # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_threads_sentinels = weakref.WeakKeyDictionary() +_threads_wakeup = weakref.WeakKeyDictionary() _global_shutdown = False -# This constant controls the maximal wakeup. If a job is submitted to the -# Executor, it might take up to _POLL_TIMEOUT for the executor to notice and -# start launching the job. This _POLL_TIMEOUT is to be cumulated with the -# communication overhead. -_POLL_TIMEOUT = .001 - - -class _Sentinel: +class _ThreadWakeup: __slot__ = ["_state"] def __init__(self): - self._state = False + self._reader, self._writer = mp.Pipe(duplex=False) def set(self): - self._state = True + self._writer.send_bytes(b"") - def get_and_unset(self): - s = self._state - if s: - self._state = False - return s + def clear(self): + while self._reader.poll(): + self._reader.recv_bytes() def _python_exit(): global _global_shutdown _global_shutdown = True - items = list(_threads_sentinels.items()) - for t, wakeup in items: - wakeup.set() + items = list(_threads_wakeup.items()) + for t, thread_wakeup in items: + thread_wakeup.set() for t, q in items: t.join() @@ -285,13 +276,14 @@ def _add_call_item_to_queue(pending_work_items, del pending_work_items[work_id] continue + def _queue_management_worker(executor_reference, processes, pending_work_items, work_ids_queue, call_queue, result_queue, - wakeup): + thread_wakeup): """Manages the communication between this process and the worker processes. This function is run in a local thread. @@ -309,8 +301,9 @@ def _queue_management_worker(executor_reference, derived from _WorkItems for processing by the process workers. result_queue: A ctx.SimpleQueue of _ResultItems generated by the process workers. - wakeup: A _Sentinel to allow waking up the queue_manager_thread from - the main Thread and avoid deadlocks caused by broken queues. + thread_wakeup: A _ThreadWakeup to allow waking up the + queue_manager_thread from the main Thread and avoid deadlocks + caused by broken queues. """ executor = None @@ -342,6 +335,8 @@ def shutdown_worker(): p.join() result_reader = result_queue._reader + wakeup_reader = thread_wakeup._reader + readers = [result_reader, wakeup_reader] while True: _add_call_item_to_queue(pending_work_items, @@ -351,25 +346,20 @@ def shutdown_worker(): # Wait for a result to be ready in the result_queue while checking # that all worker processes are still running. worker_sentinels = [p.sentinel for p in processes.values()] - received_item = False - while not wakeup.get_and_unset(): - ready = wait([result_reader] + worker_sentinels, - timeout=_POLL_TIMEOUT) - if len(ready) > 0: - break - else: - # The thread has been woken up by the main thread or the gc. - ready = [] - result_item = None - received_item = True + ready = wait(readers + worker_sentinels) + is_broken = True if result_reader in ready: try: result_item = result_reader.recv() - received_item = True + is_broken = False except BaseException as e: traceback.print_exc() - if not received_item: + elif wakeup_reader in ready: + thread_wakeup.clear() + is_broken = False + result_item = None + if is_broken: # Mark the process pool broken so that submits fail right now. executor = executor_reference() if executor is not None: @@ -549,16 +539,17 @@ def __init__(self, max_workers=None, mp_context=None, # result_queue state. This avoid deadlocks caused by the non # transmission of wakeup signal when a worker died with the # _result_queue write lock. - self._queue_management_thread_sentinel = _Sentinel() + self._queue_management_thread_wakeup = _ThreadWakeup() def _start_queue_management_thread(self): if self._queue_management_thread is None: # When the executor gets lost, the weakref callback will wake up # the queue management thread. - def weakref_cb(_, wakeup=self._queue_management_thread_sentinel): + def weakref_cb(_, + thread_wakeup=self._queue_management_thread_wakeup): mp.util.debug('Executor collected: triggering callback for' ' QueueManager wakeup') - wakeup.set() + thread_wakeup.set() # Start the processes so that their sentinels are known. self._adjust_process_count() self._queue_management_thread = threading.Thread( @@ -569,12 +560,12 @@ def weakref_cb(_, wakeup=self._queue_management_thread_sentinel): self._work_ids, self._call_queue, self._result_queue, - self._queue_management_thread_sentinel), + self._queue_management_thread_wakeup), name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() - _threads_sentinels[self._queue_management_thread] = \ - self._queue_management_thread_sentinel + _threads_wakeup[self._queue_management_thread] = \ + self._queue_management_thread_wakeup def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): @@ -601,7 +592,7 @@ def submit(self, fn, *args, **kwargs): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._queue_management_thread_sentinel.set() + self._queue_management_thread_wakeup.set() self._start_queue_management_thread() return f @@ -641,7 +632,7 @@ def shutdown(self, wait=True): self._shutdown_thread = True if self._queue_management_thread: # Wake up queue management thread - self._queue_management_thread_sentinel.set() + self._queue_management_thread_wakeup.set() if wait: self._queue_management_thread.join() # To reduce the risk of opening too many files, remove references to diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 941ca369df4415..5bc04a675d6e37 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -95,6 +95,7 @@ def tearDown(self): class ExecutorMixin: + timeout = 30 worker_count = 5 executor_kwargs = {} @@ -115,8 +116,13 @@ def setUp(self): except NotImplementedError as e: self.skipTest(str(e)) self._prime_executor() + self.timer = threading.Timer(self.timeout, self._fail_on_deadlock) + self.timer.start() def tearDown(self): + self.timer.cancel() + self.timer.join() + del self.timer self.executor.shutdown(wait=True) self.executor = None @@ -135,6 +141,23 @@ def _prime_executor(self): for f in futures: f.result() + def _fail_on_deadlock(self, executor=None): + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + if executor is None: + executor = self.executor + import faulthandler + from tempfile import TemporaryFile + with TemporaryFile(mode="w+") as f: + faulthandler.dump_traceback(file=f) + f.seek(0) + tb = f.read() + for p in executor._processes.values(): + p.terminate() + executor.shutdown(wait=True) + print(f"\nTraceback:\n {tb}", file=sys.__stderr__) + self.fail(f"Executor deadlock:\n\n{tb}") + class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor @@ -830,22 +853,6 @@ def __reduce__(self): return _raise_error, (UnpicklingError, ) -class TimingWrapper(object): - """Creates a wrapper for a function which records the time it takes to - finish - """ - def __init__(self, func): - self.func = func - self.elapsed = None - - def __call__(self, *args, **kwds): - t = time.time() - try: - return self.func(*args, **kwds) - finally: - self.elapsed = time.time() - t - - class ExecutorDeadlockTest: # If ExecutorDeadlockTest takes more than 100secs to complete, it is very # likely caught in a deadlock. As there is no easy way to detect it, @@ -857,21 +864,6 @@ def _sleep_id(cls, x, delay): time.sleep(delay) return x - def _fail_on_deadlock(self, executor): - # If we did not recover before TIMEOUT seconds, - # consider that the executor is in a deadlock state - import faulthandler - from tempfile import TemporaryFile - with TemporaryFile(mode="w+") as f: - faulthandler.dump_traceback(file=f) - f.seek(0) - tb = f.read() - for p in executor._processes.values(): - p.terminate() - executor.shutdown(wait=True) - print(f"\nTraceback:\n {tb}", file=sys.__stderr__) - self.fail(f"Executor deadlock:\n\n{tb}") - def test_crash(self): # extensive testing for deadlock caused by crash in a pool crash_cases = [ @@ -926,6 +918,7 @@ def test_crash(self): @classmethod def _test_getpid(cls, a): + time.sleep(.01) return os.getpid() @classmethod @@ -954,14 +947,9 @@ def test_crash_races(self): # Test for external crash signal comming from neighbor # with various race setup executor = self.executor_type( - max_workers=2, mp_context=get_context(self.ctx)) - try: - raise AttributeError() - pids = [p.pid for p in executor._processes] - assert len(pids) == n_proc - except AttributeError: - pids = [pid for pid in executor.map( - self._test_getpid, [None] * n_proc)] + max_workers=n_proc, mp_context=get_context(self.ctx)) + pids = [pid for pid in executor.map( + self._test_getpid, [None] * n_proc)] assert None not in pids res = self.executor.map( self._sleep_id, repeat(True, 2 * n_proc), @@ -982,9 +970,10 @@ def test_crash_races(self): def test_shutdown_deadlock(self): # Test that the pool calling shutdown do not cause deadlock # if a worker failed - + self.executor.shutdown(wait=True) with self.executor_type(max_workers=2, mp_context=get_context(self.ctx)) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock executor.submit(self._test_kill_worker, ()) time.sleep(.01) executor.shutdown(wait=True) From 7711778da13895138f2aca3468163ea35baba9d9 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Tue, 28 Nov 2017 15:25:00 +0100 Subject: [PATCH 08/15] CLN fix some typo+better bpe msg+fix some test --- Lib/concurrent/futures/process.py | 49 ++++++++++++++++------------- Lib/test/test_concurrent_futures.py | 7 +++-- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 1ef489b4e48c17..0ac243c173d8c8 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -73,7 +73,7 @@ # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_threads_wakeup = weakref.WeakKeyDictionary() +_threads_wakeups = weakref.WeakKeyDictionary() _global_shutdown = False @@ -83,7 +83,7 @@ class _ThreadWakeup: def __init__(self): self._reader, self._writer = mp.Pipe(duplex=False) - def set(self): + def wakeup(self): self._writer.send_bytes(b"") def clear(self): @@ -94,9 +94,9 @@ def clear(self): def _python_exit(): global _global_shutdown _global_shutdown = True - items = list(_threads_wakeup.items()) + items = list(_threads_wakeups.items()) for t, thread_wakeup in items: - thread_wakeup.set() + thread_wakeup.wakeup() for t, q in items: t.join() @@ -163,7 +163,6 @@ def _on_queue_feeder_error(self, e, obj): # work_item can be None if another process terminated (see above) if work_item is not None: work_item.future.set_exception(e) - del work_item else: super()._on_queue_feeder_error(e, obj) @@ -348,13 +347,15 @@ def shutdown_worker(): worker_sentinels = [p.sentinel for p in processes.values()] ready = wait(readers + worker_sentinels) + cause = None is_broken = True if result_reader in ready: try: result_item = result_reader.recv() is_broken = False except BaseException as e: - traceback.print_exc() + cause = traceback.format_exception(type(e), e, e.__traceback__) + elif wakeup_reader in ready: thread_wakeup.clear() is_broken = False @@ -368,14 +369,15 @@ def shutdown_worker(): 'usable anymore') executor._shutdown_thread = True executor = None + bpe = BrokenProcessPool("A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause is not None: + bpe.__cause__ = _RemoteTraceback( + f"\n'''\n{''.join(cause)}'''") # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): - work_item.future.set_exception( - BrokenProcessPool( - "A process in the process pool was " - "terminated abruptly while the future was " - "running or pending." - )) + work_item.future.set_exception(bpe) # Delete references to object. See issue16284 del work_item pending_work_items.clear() @@ -535,21 +537,24 @@ def __init__(self, max_workers=None, mp_context=None, self._result_queue = mp_context.SimpleQueue() self._work_ids = queue.Queue() - # Permits to wake_up the queue_manager_thread independently of - # result_queue state. This avoid deadlocks caused by the non - # transmission of wakeup signal when a worker died with the - # _result_queue write lock. + # _ThreadWakeup is a communication channel used to interrupt the wait + # of the main loop of queue_manager_thread from another thread (e.g. + # when calling executor.submit or executor.shutdown). We do not use the + # _result_queue to send the wakeup signal to the queue_manager_thread + # as it could result in a deadlock if a worker process dies with the + # _result_queue write lock still acquired. self._queue_management_thread_wakeup = _ThreadWakeup() def _start_queue_management_thread(self): if self._queue_management_thread is None: - # When the executor gets lost, the weakref callback will wake up - # the queue management thread. + # When the executor gets garbarge collected, the weakref callback + # will wake up the queue management thread so that it can terminate + # if there is no pending work item. def weakref_cb(_, thread_wakeup=self._queue_management_thread_wakeup): mp.util.debug('Executor collected: triggering callback for' ' QueueManager wakeup') - thread_wakeup.set() + thread_wakeup.wakeup() # Start the processes so that their sentinels are known. self._adjust_process_count() self._queue_management_thread = threading.Thread( @@ -564,7 +569,7 @@ def weakref_cb(_, name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() - _threads_wakeup[self._queue_management_thread] = \ + _threads_wakeups[self._queue_management_thread] = \ self._queue_management_thread_wakeup def _adjust_process_count(self): @@ -592,7 +597,7 @@ def submit(self, fn, *args, **kwargs): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._queue_management_thread_wakeup.set() + self._queue_management_thread_wakeup.wakeup() self._start_queue_management_thread() return f @@ -632,7 +637,7 @@ def shutdown(self, wait=True): self._shutdown_thread = True if self._queue_management_thread: # Wake up queue management thread - self._queue_management_thread_wakeup.set() + self._queue_management_thread_wakeup.wakeup() if wait: self._queue_management_thread.join() # To reduce the risk of opening too many files, remove references to diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 5bc04a675d6e37..835ec96608708e 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -865,6 +865,7 @@ def _sleep_id(cls, x, delay): return x def test_crash(self): + self.executor.shutdown(wait=True) # extensive testing for deadlock caused by crash in a pool crash_cases = [ # Check problem occuring while pickling a task in @@ -940,8 +941,8 @@ def _test_kill_worker(cls, pid=None, delay=0.01): time.sleep(.5) def test_crash_races(self): + self.executor.shutdown(wait=True) - from itertools import repeat for n_proc in [1, 2, 5, 17]: with self.subTest(n_proc=n_proc): # Test for external crash signal comming from neighbor @@ -951,8 +952,8 @@ def test_crash_races(self): pids = [pid for pid in executor.map( self._test_getpid, [None] * n_proc)] assert None not in pids - res = self.executor.map( - self._sleep_id, repeat(True, 2 * n_proc), + res = executor.map( + self._sleep_id, [True] * 2 * n_proc, [.001 * (j // 2) for j in range(2 * n_proc)], chunksize=1) assert all(res) From 2dfe10ae7f65ceeed9af98c13740beb6ef28a173 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Wed, 29 Nov 2017 11:35:00 +0100 Subject: [PATCH 09/15] ENH faster thread_wakeup clear+typo --- Lib/concurrent/futures/process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 0ac243c173d8c8..d1fb493e5e0529 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -302,7 +302,7 @@ def _queue_management_worker(executor_reference, process workers. thread_wakeup: A _ThreadWakeup to allow waking up the queue_manager_thread from the main Thread and avoid deadlocks - caused by broken queues. + caused by permanently locked queues. """ executor = None @@ -357,9 +357,9 @@ def shutdown_worker(): cause = traceback.format_exception(type(e), e, e.__traceback__) elif wakeup_reader in ready: - thread_wakeup.clear() is_broken = False result_item = None + thread_wakeup.clear() if is_broken: # Mark the process pool broken so that submits fail right now. executor = executor_reference() From 533af3bcb5dc34b0ac85a72fc74027ffa0d2a0bb Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Wed, 29 Nov 2017 12:53:45 +0100 Subject: [PATCH 10/15] CLN comment on del self.timer --- Lib/test/test_concurrent_futures.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 835ec96608708e..0eadc4826e7e1b 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -122,6 +122,8 @@ def setUp(self): def tearDown(self): self.timer.cancel() self.timer.join() + # Remove the reference to self.timer to avoid the thread_cleanup + # warnings, as this class is re-used for multiple tests. del self.timer self.executor.shutdown(wait=True) self.executor = None From 919156ac4d97fb5cc6e4c85e4fa3fe7e90590572 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 1 Dec 2017 14:51:56 +0100 Subject: [PATCH 11/15] CLN improved comments for ProcessPoolExecutor --- Lib/concurrent/futures/process.py | 16 ++++++++------ Lib/test/_test_multiprocessing.py | 1 + Lib/test/test_concurrent_futures.py | 33 +---------------------------- 3 files changed, 12 insertions(+), 38 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index d1fb493e5e0529..aaa5151e017c0f 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -95,9 +95,9 @@ def _python_exit(): global _global_shutdown _global_shutdown = True items = list(_threads_wakeups.items()) - for t, thread_wakeup in items: + for _, thread_wakeup in items: thread_wakeup.wakeup() - for t, q in items: + for t, _ in items: t.join() # Controls how many more calls than processes will be queued in the call queue. @@ -160,7 +160,8 @@ def _on_queue_feeder_error(self, e, obj): tb = traceback.format_exception(type(e), e, e.__traceback__) e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) - # work_item can be None if another process terminated (see above) + # work_item can be None if another process terminated. In this case, + # the queue_manager_thread fails all work_items with BrokenProcessPool if work_item is not None: work_item.future.set_exception(e) else: @@ -315,7 +316,7 @@ def shutdown_worker(): n_children_alive = sum(p.is_alive() for p in processes.values()) n_children_to_stop = n_children_alive n_sentinels_sent = 0 - # Sent the right number of sentinels, to make sure all children are + # Send the right number of sentinels, to make sure all children are # properly terminated. while n_sentinels_sent < n_children_to_stop and n_children_alive > 0: for i in range(n_children_to_stop - n_sentinels_sent): @@ -343,7 +344,10 @@ def shutdown_worker(): call_queue) # Wait for a result to be ready in the result_queue while checking - # that all worker processes are still running. + # that all worker processes are still running, or for a wake up + # signal send. The wake up signals come either from new tasks being + # submitted, from the executor being shutdown/gc-ed, or from the + # shutdown of the python interpreter. worker_sentinels = [p.sentinel for p in processes.values()] ready = wait(readers + worker_sentinels) @@ -509,7 +513,7 @@ def __init__(self, max_workers=None, mp_context=None, self._initializer = initializer self._initargs = initargs - # Management threads + # Management thread self._queue_management_thread = None # Map of pids to processes diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index b2e96a51141d58..0dfc705f441f66 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1054,6 +1054,7 @@ def _on_queue_feeder_error(e, obj): obj.on_queue_feeder_error_was_called = True not_serializable_obj = NotSerializable() + # The captured_stderr reduces the noise in the test report with test.support.captured_stderr(): q = SafeQueue(ctx=multiprocessing.get_context()) q.put(not_serializable_obj) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 0eadc4826e7e1b..b7368e686d2506 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -902,10 +902,7 @@ def test_crash(self): ] for func, args, error, name in crash_cases: with self.subTest(name): - # skip the test involving pickle errors with manager as it - # breaks the manager and not the pool in this cases - # skip the test involving pickle errors with thread as the - # tasks and results are not pickled in this case + # The captured_stderr reduces the noise in the test report with test.support.captured_stderr(): executor = self.executor_type( max_workers=2, mp_context=get_context(self.ctx)) @@ -942,34 +939,6 @@ def _test_kill_worker(cls, pid=None, delay=0.01): # Give some time for the Executor to detect the failure time.sleep(.5) - def test_crash_races(self): - self.executor.shutdown(wait=True) - - for n_proc in [1, 2, 5, 17]: - with self.subTest(n_proc=n_proc): - # Test for external crash signal comming from neighbor - # with various race setup - executor = self.executor_type( - max_workers=n_proc, mp_context=get_context(self.ctx)) - pids = [pid for pid in executor.map( - self._test_getpid, [None] * n_proc)] - assert None not in pids - res = executor.map( - self._sleep_id, [True] * 2 * n_proc, - [.001 * (j // 2) for j in range(2 * n_proc)], - chunksize=1) - assert all(res) - res = executor.map(self._test_kill_worker, pids[::-1], - timeout=self.TIMEOUT) - with self.assertRaises(BrokenProcessPool): - try: - [v for v in res] - except futures.TimeoutError: - # If we did not recover before TIMEOUT seconds, - # consider that the executor is in a deadlock state - self._fail_on_deadlock(executor) - executor.shutdown(wait=True) - def test_shutdown_deadlock(self): # Test that the pool calling shutdown do not cause deadlock # if a worker failed From c5b78fe08caad9fc348e1b2ed6388d4ef4d7d75b Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 1 Dec 2017 16:15:48 +0100 Subject: [PATCH 12/15] CLN remove specific per-test timer --- Lib/test/test_concurrent_futures.py | 41 +++++++++++++---------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index b7368e686d2506..e4f8d88a93d852 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -116,15 +116,10 @@ def setUp(self): except NotImplementedError as e: self.skipTest(str(e)) self._prime_executor() - self.timer = threading.Timer(self.timeout, self._fail_on_deadlock) - self.timer.start() def tearDown(self): - self.timer.cancel() - self.timer.join() # Remove the reference to self.timer to avoid the thread_cleanup # warnings, as this class is re-used for multiple tests. - del self.timer self.executor.shutdown(wait=True) self.executor = None @@ -143,23 +138,6 @@ def _prime_executor(self): for f in futures: f.result() - def _fail_on_deadlock(self, executor=None): - # If we did not recover before TIMEOUT seconds, - # consider that the executor is in a deadlock state - if executor is None: - executor = self.executor - import faulthandler - from tempfile import TemporaryFile - with TemporaryFile(mode="w+") as f: - faulthandler.dump_traceback(file=f) - f.seek(0) - tb = f.read() - for p in executor._processes.values(): - p.terminate() - executor.shutdown(wait=True) - print(f"\nTraceback:\n {tb}", file=sys.__stderr__) - self.fail(f"Executor deadlock:\n\n{tb}") - class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor @@ -866,6 +844,21 @@ def _sleep_id(cls, x, delay): time.sleep(delay) return x + def _fail_on_deadlock(self, executor): + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + import faulthandler + from tempfile import TemporaryFile + with TemporaryFile(mode="w+") as f: + faulthandler.dump_traceback(file=f) + f.seek(0) + tb = f.read() + for p in executor._processes.values(): + p.terminate() + executor.shutdown(wait=True) + print(f"\nTraceback:\n {tb}", file=sys.__stderr__) + self.fail(f"Executor deadlock:\n\n{tb}") + def test_crash(self): self.executor.shutdown(wait=True) # extensive testing for deadlock caused by crash in a pool @@ -898,7 +891,9 @@ def test_crash(self): # Check problem occuring while unpickling a task in # the result_handler thread (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool, - "error during result unpickle in result_handler") + "error during result unpickle in result_handler"), + (_return_instance, (ExitAtUnpickle,), BrokenProcessPool, + "exit during result unpickle in result_handler") ] for func, args, error, name in crash_cases: with self.subTest(name): From 874f263096cc02aa8e61a593301a82402328d8a6 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 1 Dec 2017 16:44:56 +0100 Subject: [PATCH 13/15] CLN remove unnecessary code and improve test_shutdown_deadlock --- Lib/test/test_concurrent_futures.py | 47 +++++++++++------------------ 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index e4f8d88a93d852..37261a01507006 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -771,26 +771,28 @@ def hide_process_stderr(): setattr(sys, "stderr", io.StringIO()) -def _crash(): - """Induces a segfault""" +def _crash(delay=None): + """Induces a segfault.""" + if delay: + time.sleep(delay) import faulthandler faulthandler.disable() faulthandler._sigsegv() def _exit(): - """Induces a sys exit with exitcode 1""" + """Induces a sys exit with exitcode 1.""" sys.exit(1) def _raise_error(Err): - """Function that raises an Exception in process""" + """Function that raises an Exception in process.""" hide_process_stderr() raise Err() def _return_instance(cls): - """Function that returns a instance of cls""" + """Function that returns a instance of cls.""" hide_process_stderr() return cls() @@ -845,8 +847,9 @@ def _sleep_id(cls, x, delay): return x def _fail_on_deadlock(self, executor): - # If we did not recover before TIMEOUT seconds, - # consider that the executor is in a deadlock state + # If we did not recover before TIMEOUT seconds, consider that the + # executor is in a deadlock state and forcefully clean all its + # composants. import faulthandler from tempfile import TemporaryFile with TemporaryFile(mode="w+") as f: @@ -855,13 +858,16 @@ def _fail_on_deadlock(self, executor): tb = f.read() for p in executor._processes.values(): p.terminate() + # This should be safe to call executor.shutdown here as all possible + # deadlocks should have been broken. executor.shutdown(wait=True) print(f"\nTraceback:\n {tb}", file=sys.__stderr__) self.fail(f"Executor deadlock:\n\n{tb}") + def test_crash(self): + # extensive testing for deadlock caused by crashes in a pool. self.executor.shutdown(wait=True) - # extensive testing for deadlock caused by crash in a pool crash_cases = [ # Check problem occuring while pickling a task in # the task_handler thread @@ -916,34 +922,17 @@ def _test_getpid(cls, a): time.sleep(.01) return os.getpid() - @classmethod - def _test_kill_worker(cls, pid=None, delay=0.01): - """Function that send SIGKILL at process pid after delay second""" - time.sleep(delay) - if pid is None: - pid = os.getpid() - try: - from signal import SIGKILL - except ImportError: - from signal import SIGTERM as SIGKILL - # Try to kill a process in the pool, if it is not finished yet - try: - os.kill(pid, SIGKILL) - except (ProcessLookupError, PermissionError): - pass - # Give some time for the Executor to detect the failure - time.sleep(.5) - def test_shutdown_deadlock(self): # Test that the pool calling shutdown do not cause deadlock - # if a worker failed + # if a worker fails after the shutdown call. self.executor.shutdown(wait=True) with self.executor_type(max_workers=2, mp_context=get_context(self.ctx)) as executor: self.executor = executor # Allow clean up in fail_on_deadlock - executor.submit(self._test_kill_worker, ()) - time.sleep(.01) + f = executor.submit(_crash, delay=.1) executor.shutdown(wait=True) + with self.assertRaises(BrokenProcessPool): + f.result() create_executor_tests(ExecutorDeadlockTest, From 942fd79fd6c39b94438dd358528428ae2f8083fb Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 5 Jan 2018 10:40:23 +0100 Subject: [PATCH 14/15] CLN test comments + remove unused code --- Lib/test/test_concurrent_futures.py | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 37261a01507006..178d07ebf08aa0 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -95,7 +95,6 @@ def tearDown(self): class ExecutorMixin: - timeout = 30 worker_count = 5 executor_kwargs = {} @@ -118,8 +117,6 @@ def setUp(self): self._prime_executor() def tearDown(self): - # Remove the reference to self.timer to avoid the thread_cleanup - # warnings, as this class is re-used for multiple tests. self.executor.shutdown(wait=True) self.executor = None @@ -376,16 +373,14 @@ def test_del_shutdown(self): queue_management_thread = executor._queue_management_thread del executor + # Make sure that all the executor ressources were properly cleaned by + # the shutdown process queue_management_thread.join() for p in processes.values(): p.join() - # Make sure that the queue management thread was properly finished - # and the queue was closed by the shutdown process - queue_management_thread.join() call_queue.join_thread() - create_executor_tests(ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, @@ -768,7 +763,7 @@ def test_ressources_gced_in_workers(self): def hide_process_stderr(): import io - setattr(sys, "stderr", io.StringIO()) + sys.stderr = io.StringIO() def _crash(delay=None): @@ -810,7 +805,7 @@ def __reduce__(self): class ExitAtPickle(object): - """Bad object that triggers a segfault at pickling time.""" + """Bad object that triggers a process exit at pickling time.""" def __reduce__(self): _exit() @@ -822,23 +817,20 @@ def __reduce__(self): class ErrorAtPickle(object): - """Bad object that triggers a segfault at pickling time.""" + """Bad object that triggers an error at pickling time.""" def __reduce__(self): from pickle import PicklingError raise PicklingError("Error in pickle") class ErrorAtUnpickle(object): - """Bad object that triggers a process exit at unpickling time.""" + """Bad object that triggers an error at unpickling time.""" def __reduce__(self): from pickle import UnpicklingError return _raise_error, (UnpicklingError, ) class ExecutorDeadlockTest: - # If ExecutorDeadlockTest takes more than 100secs to complete, it is very - # likely caught in a deadlock. As there is no easy way to detect it, - # faulthandler will print the traceback and exit. TIMEOUT = 15 @classmethod @@ -917,11 +909,6 @@ def test_crash(self): self._fail_on_deadlock(executor) executor.shutdown(wait=True) - @classmethod - def _test_getpid(cls, a): - time.sleep(.01) - return os.getpid() - def test_shutdown_deadlock(self): # Test that the pool calling shutdown do not cause deadlock # if a worker fails after the shutdown call. From aca74f8b6bafb7cfa10932dbde02846f985155ac Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 5 Jan 2018 10:59:48 +0100 Subject: [PATCH 15/15] Improvement NEWS wording --- .../next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst b/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst index d5c4c6eee50f9b..49cbbb3b920f9f 100644 --- a/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst +++ b/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst @@ -1,3 +1,4 @@ -Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor`, caused by -pickling or unpickling errors. This should make sure that calls to the -:class:`ProcessPoolExecutor` API always return. +Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when +task arguments or results cause pickling or unpickling errors. +This should make sure that calls to the :class:`ProcessPoolExecutor` API +always eventually return.