From d37e3603b2b2dc2c9fe8efcfad662d03f00876ef Mon Sep 17 00:00:00 2001 From: oesteban Date: Tue, 6 Nov 2018 15:14:23 -0800 Subject: [PATCH 01/14] add tests --- Lib/test/_test_multiprocessing.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 7993fcb08e465a..d37b0c3392bf5b 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2571,6 +2571,16 @@ def raising(): def unpickleable_result(): return lambda: 42 +def waiting(): + while True: + time.sleep(10) + +def bad_exit(value): + if value: + from sys import exit + exit(123) + + class _TestPoolWorkerErrors(BaseTestCase): ALLOWED_TYPES = ('processes', ) @@ -2611,6 +2621,26 @@ def errback(exc): p.close() p.join() + def test_broken_process_pool1(self): + from multiprocessing.pool import BrokenProcessPool + p = multiprocessing.Pool(2) + res = p.map_async(waiting, range(3)) + # Kill one of the pool workers. + pid = p._pool[0].pid + os.kill(pid, signal.SIGTERM) + self.assertRaises(BrokenProcessPool, res.get) + p.close() + p.join() + + def test_broken_process_pool2(self): + from multiprocessing.pool import BrokenProcessPool + p = multiprocessing.Pool(2) + with self.assertRaises(BrokenProcessPool): + res = p.map(bad_exit, [0, 0, 1, 0]) + p.close() + p.join() + + class _TestPoolWorkerLifetime(BaseTestCase): ALLOWED_TYPES = ('processes', ) From bc08d853f1d1055428bf38cca0487c0daba17d2a Mon Sep 17 00:00:00 2001 From: oesteban Date: Thu, 8 Nov 2018 16:45:11 -0800 Subject: [PATCH 02/14] base patch --- Lib/multiprocessing/pool.py | 175 +++++++++++++++++++++++++----- Lib/test/_test_multiprocessing.py | 8 +- 2 files changed, 149 insertions(+), 34 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 7a6d014901463e..34a1e9de005338 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -20,12 +20,13 @@ import os import time import traceback +from signal import SIGTERM, SIGINT, SIGKILL # If threading is available then ThreadPool should be provided. Therefore # we avoid top-level imports which are liable to fail on some systems. +from .connection import wait from . import util from . import get_context, TimeoutError - # # Constants representing the state of a pool # @@ -33,6 +34,7 @@ RUN = 0 CLOSE = 1 TERMINATE = 2 +BROKEN = 3 # # Miscellaneous @@ -40,12 +42,20 @@ job_counter = itertools.count() + def mapstar(args): return list(map(*args)) def starmapstar(args): return list(itertools.starmap(args[0], args[1])) + +class BrokenProcessPool(RuntimeError): + """ + Raised when a process in a ProcessPoolExecutor terminated abruptly + while a future was in the running state. + """ + # # Hack to embed stringification of remote traceback in local traceback # @@ -104,10 +114,13 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, if initializer is not None: initializer(*initargs) + util.debug('worker started') completed = 0 while maxtasks is None or (maxtasks and completed < maxtasks): try: + util.debug('worker picking up one task') task = get() + util.debug('worker picked up one task') except (EOFError, OSError): util.debug('worker got EOFError or OSError -- exiting') break @@ -125,6 +138,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, result = (False, e) try: put((job, i, result)) + util.debug('worker - task submitted (job %d, id %d)' % (job, i)) except Exception as e: wrapped = MaybeEncodingError(e, result[1]) util.debug("Possible encoding error while sending result: %s" % ( @@ -133,6 +147,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, task = job = result = func = args = kwds = None completed += 1 + util.debug('worker exiting after %d tasks' % completed) def _helper_reraises_exception(ex): @@ -184,12 +199,17 @@ def __init__(self, processes=None, initializer=None, initargs=(), p.join() raise + # This lock is used to ensure we don't try to send the shut + # down sentinel to worker processes while we're in the process + # of repopulating the pool. + self._repopulate_lock = threading.Lock() + self._worker_handler = threading.Thread( target=Pool._handle_workers, args=(self._cache, self._taskqueue, self._ctx, self.Process, - self._processes, self._pool, self._inqueue, self._outqueue, + self._processes, self, self._inqueue, self._outqueue, self._initializer, self._initargs, self._maxtasksperchild, - self._wrap_exception) + self._wrap_exception, self._repopulate_lock) ) self._worker_handler.daemon = True self._worker_handler._state = RUN @@ -206,7 +226,8 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._result_handler = threading.Thread( target=Pool._handle_results, - args=(self._outqueue, self._quick_get, self._cache) + args=(self._outqueue, self._quick_get, self._cache, + self, self._repopulate_lock) ) self._result_handler.daemon = True self._result_handler._state = RUN @@ -220,20 +241,33 @@ def __init__(self, processes=None, initializer=None, initargs=(), exitpriority=15 ) + @staticmethod - def _join_exited_workers(pool): + def _join_exited_workers(pool, wait_pid=None): """Cleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. + + If pid is provided, explicitly call join() on the process matching that + pid. + """ cleaned = False + broken = [] for i in reversed(range(len(pool))): worker = pool[i] - if worker.exitcode is not None: + exitcode = worker.exitcode or worker._popen.poll() + if exitcode is not None: # worker exited - util.debug('cleaning up worker %d' % i) + broken.append(exitcode != 0) + util.debug('cleaning up worker %d (exitcode %d)' % (i, exitcode)) worker.join() cleaned = True del pool[i] + + if any(broken): + thread = threading.current_thread() + thread._state = BROKEN + return False return cleaned def _repopulate_pool(self): @@ -252,6 +286,7 @@ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, """Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. """ + util.debug('repopulating workers...') for i in range(processes - len(pool)): w = Process(ctx, target=worker, args=(inqueue, outqueue, @@ -263,15 +298,26 @@ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, w.daemon = True w.start() pool.append(w) - util.debug('added worker') + util.debug('added worker %d' % (len(pool) - 1)) + + def _maintain_pool(self, wait_pid=None): + return self._maintain_pool_static(self._ctx, self.Process, + self._processes, + self, self._inqueue, + self._outqueue, self._initializer, + self._initargs, + self._maxtasksperchild, + self._wrap_exception, + wait_pid=wait_pid) @staticmethod - def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, - initializer, initargs, maxtasksperchild, - wrap_exception): + def _maintain_pool_static(ctx, Process, processes, poolobj, inqueue, outqueue, + initializer, initargs, maxtasksperchild, + wrap_exception, wait_pid=None): """Clean up any exited workers and start replacements for them. """ - if Pool._join_exited_workers(pool): + pool = poolobj._pool + if Pool._join_exited_workers(pool, wait_pid=wait_pid): Pool._repopulate_pool_static(ctx, Process, processes, pool, inqueue, outqueue, initializer, initargs, maxtasksperchild, @@ -433,25 +479,28 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, return result @staticmethod - def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, + def _handle_workers(cache, taskqueue, ctx, Process, processes, poolobj, inqueue, outqueue, initializer, initargs, - maxtasksperchild, wrap_exception): + maxtasksperchild, wrap_exception, repopulate_lock): thread = threading.current_thread() + util.debug('worker handler entering') # Keep maintaining workers until the cache gets drained, unless the pool # is terminated. - while thread._state == RUN or (cache and thread._state != TERMINATE): - Pool._maintain_pool(ctx, Process, processes, pool, inqueue, - outqueue, initializer, initargs, - maxtasksperchild, wrap_exception) + while thread._state == RUN or (cache and thread._state not in (BROKEN, TERMINATE)): + Pool._maintain_pool_static(ctx, Process, processes, poolobj, inqueue, + outqueue, initializer, initargs, + maxtasksperchild, wrap_exception) time.sleep(0.1) # send sentinel to stop workers - taskqueue.put(None) + with repopulate_lock: + taskqueue.put(None) util.debug('worker handler exiting') @staticmethod def _handle_tasks(taskqueue, put, outqueue, pool, cache): thread = threading.current_thread() + util.debug('task handler entering') for taskseq, set_length in iter(taskqueue.get, None): task = None @@ -462,7 +511,9 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache): util.debug('task handler found thread._state != RUN') break try: + util.debug('task handler adding task %d with id %d' % task[:2]) put(task) + util.debug('task handler added task %d with id %d' % task[:2]) except Exception as e: job, idx = task[:2] try: @@ -496,10 +547,22 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache): util.debug('task handler exiting') @staticmethod - def _handle_results(outqueue, get, cache): + def _handle_results(outqueue, get, cache, pool, repopulate_lock): thread = threading.current_thread() + is_process = hasattr(outqueue, '_reader') + util.debug('result handler entering') while 1: + # Determine if the Pool has been broken. + if is_process and Pool._handle_broken_pool(outqueue, pool._pool, cache): + # Mark worker handler as broken. We need this for terminate + # to succeed. + util.debug('setting all handlers to BROKEN state') + thread._state = BROKEN + pool._handle_workers._state = BROKEN + pool._handle_tasks._state = BROKEN + break + try: task = get() except (OSError, EOFError): @@ -515,6 +578,14 @@ def _handle_results(outqueue, get, cache): util.debug('result handler got sentinel') break + if isinstance(task, int): + # A worker is exiting due to maxtasksperchild. Wait + # for that process to actually exit before continuing. + util.debug('waiting on PID=%d' % task) + with repopulate_lock: + pool._maintain_pool(wait_pid=task) + continue + job, i, obj = task try: cache[job]._set(i, obj) @@ -522,7 +593,7 @@ def _handle_results(outqueue, get, cache): pass task = job = obj = None - while cache and thread._state != TERMINATE: + while cache and thread._state not in (TERMINATE, BROKEN): try: task = get() except (OSError, EOFError): @@ -532,6 +603,8 @@ def _handle_results(outqueue, get, cache): if task is None: util.debug('result handler ignoring extra sentinel') continue + if isinstance(task, int): + continue job, i, obj = task try: cache[job]._set(i, obj) @@ -539,7 +612,7 @@ def _handle_results(outqueue, get, cache): pass task = job = obj = None - if hasattr(outqueue, '_reader'): + if is_process: util.debug('ensuring that outqueue is not full') # If we don't make room available in outqueue then # attempts to add the sentinel (None) to outqueue may @@ -555,6 +628,33 @@ def _handle_results(outqueue, get, cache): util.debug('result handler exiting: len(cache)=%s, thread._state=%s', len(cache), thread._state) + @staticmethod + def _handle_broken_pool(outqueue, pool, cache): + """ Handle case where a process has unexpectedly exited. + + If a Process has exited without alerting the result_handler + thread first, we need to close/terminate the pool and abort + all running tasks. + + """ + result_reader = outqueue._reader + ready = wait([result_reader] + [p.sentinel for p in pool]) + if result_reader in ready: + return False + + terminated = [p._popen.poll() for p in pool] + if all([t in [0, None] for t in terminated]): + util.debug("worker is finished: %s" % terminated) + return False + + util.debug("pool is broken: %s" % terminated) + for i, cache_ent in list(cache.items()): + err = BrokenProcessPool( + 'A worker of the pool terminated abruptly ' + 'while the child process was still executing.') + cache_ent._set(i, (False, err)) + return True + @staticmethod def _get_tasks(func, it, size): it = iter(it) @@ -578,7 +678,8 @@ def close(self): def terminate(self): util.debug('terminating pool') self._state = TERMINATE - self._worker_handler._state = TERMINATE + if self._worker_handler._state != BROKEN: + self._worker_handler._state = TERMINATE self._terminate() def join(self): @@ -606,13 +707,21 @@ def _help_stuff_finish(inqueue, task_handler, size): def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once - util.debug('finalizing pool') + util.debug('terminate pool started: should be called once at most') + is_broken = BROKEN in (task_handler._state, + worker_handler._state, + result_handler._state) - worker_handler._state = TERMINATE task_handler._state = TERMINATE + worker_handler._state = TERMINATE - util.debug('helping task handler/workers to finish') - cls._help_stuff_finish(inqueue, task_handler, len(pool)) + # Skip _help_finish_stuff if the pool is broken, because + # the broken process may have been holding the inqueue lock. + if not is_broken: + util.debug('helping task handler/workers to finish') + cls._help_stuff_finish(inqueue, task_handler, len(pool)) + else: + util.debug('finalizing BROKEN procress pool') if (not result_handler.is_alive()) and (len(cache) != 0): raise AssertionError( @@ -623,8 +732,8 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, # We must wait for the worker handler to exit before terminating # workers because we don't want workers to be restarted behind our back. - util.debug('joining worker handler') if threading.current_thread() is not worker_handler: + util.debug('joining worker handler') worker_handler.join() # Terminate workers which haven't already finished. @@ -634,12 +743,12 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, if p.exitcode is None: p.terminate() - util.debug('joining task handler') if threading.current_thread() is not task_handler: + util.debug('joining task handler') task_handler.join() - util.debug('joining result handler') if threading.current_thread() is not result_handler: + util.debug('joining result handler') result_handler.join() if pool and hasattr(pool[0], 'terminate'): @@ -650,6 +759,8 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, util.debug('cleaning up worker %d' % p.pid) p.join() + util.debug('terminate pool finalized') + def __enter__(self): return self @@ -841,6 +952,10 @@ def _setup_queues(self): self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get + @staticmethod + def _handle_broken_pool(outqueue, pool, cache): + return False + @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # drain inqueue, and put sentinels at its head to make workers finish diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d37b0c3392bf5b..db8ecc748b91a2 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2571,9 +2571,8 @@ def raising(): def unpickleable_result(): return lambda: 42 -def waiting(): - while True: - time.sleep(10) +def waiting(args): + time.sleep(7) def bad_exit(value): if value: @@ -2628,7 +2627,8 @@ def test_broken_process_pool1(self): # Kill one of the pool workers. pid = p._pool[0].pid os.kill(pid, signal.SIGTERM) - self.assertRaises(BrokenProcessPool, res.get) + with self.assertRaises(BrokenProcessPool): + res.get(timeout=10) p.close() p.join() From 4eac116969b54bbf2fdef2645bf0bc9395e453b8 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 9 Nov 2018 11:01:31 -0800 Subject: [PATCH 03/14] finishing up fix --- Lib/multiprocessing/pool.py | 161 ++++++++++-------------------- Lib/test/_test_multiprocessing.py | 18 +++- 2 files changed, 70 insertions(+), 109 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 34a1e9de005338..7a9a454719908f 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -118,9 +118,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, completed = 0 while maxtasks is None or (maxtasks and completed < maxtasks): try: - util.debug('worker picking up one task') task = get() - util.debug('worker picked up one task') except (EOFError, OSError): util.debug('worker got EOFError or OSError -- exiting') break @@ -138,7 +136,6 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, result = (False, e) try: put((job, i, result)) - util.debug('worker - task submitted (job %d, id %d)' % (job, i)) except Exception as e: wrapped = MaybeEncodingError(e, result[1]) util.debug("Possible encoding error while sending result: %s" % ( @@ -182,7 +179,7 @@ def __init__(self, processes=None, initializer=None, initargs=(), if processes is None: processes = os.cpu_count() or 1 if processes < 1: - raise ValueError("Number of processes must be at least 1") + raise ValueError("Number of processes must be 2 or more") if initializer is not None and not callable(initializer): raise TypeError('initializer must be a callable') @@ -199,17 +196,12 @@ def __init__(self, processes=None, initializer=None, initargs=(), p.join() raise - # This lock is used to ensure we don't try to send the shut - # down sentinel to worker processes while we're in the process - # of repopulating the pool. - self._repopulate_lock = threading.Lock() - self._worker_handler = threading.Thread( target=Pool._handle_workers, args=(self._cache, self._taskqueue, self._ctx, self.Process, - self._processes, self, self._inqueue, self._outqueue, + self._processes, self._pool, self._inqueue, self._outqueue, self._initializer, self._initargs, self._maxtasksperchild, - self._wrap_exception, self._repopulate_lock) + self._wrap_exception) ) self._worker_handler.daemon = True self._worker_handler._state = RUN @@ -226,8 +218,7 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._result_handler = threading.Thread( target=Pool._handle_results, - args=(self._outqueue, self._quick_get, self._cache, - self, self._repopulate_lock) + args=(self._outqueue, self._quick_get, self._cache) ) self._result_handler.daemon = True self._result_handler._state = RUN @@ -243,7 +234,7 @@ def __init__(self, processes=None, initializer=None, initargs=(), @staticmethod - def _join_exited_workers(pool, wait_pid=None): + def _join_exited_workers(pool): """Cleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. @@ -255,19 +246,27 @@ def _join_exited_workers(pool, wait_pid=None): broken = [] for i in reversed(range(len(pool))): worker = pool[i] - exitcode = worker.exitcode or worker._popen.poll() - if exitcode is not None: + broken.append(worker.exitcode not in (None, 0)) + if worker.exitcode is not None: # worker exited - broken.append(exitcode != 0) - util.debug('cleaning up worker %d (exitcode %d)' % (i, exitcode)) + util.debug('cleaning up worker %d' % i) worker.join() cleaned = True del pool[i] if any(broken): + # Stop all workers + util.info('worker handler: process pool is broken, terminating workers...') thread = threading.current_thread() thread._state = BROKEN - return False + for p in pool: + if p.exitcode is None: + p.terminate() + for p in pool: + p.join() + + del pool[:] + return None return cleaned def _repopulate_pool(self): @@ -286,7 +285,6 @@ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, """Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. """ - util.debug('repopulating workers...') for i in range(processes - len(pool)): w = Process(ctx, target=worker, args=(inqueue, outqueue, @@ -298,30 +296,24 @@ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, w.daemon = True w.start() pool.append(w) - util.debug('added worker %d' % (len(pool) - 1)) - - def _maintain_pool(self, wait_pid=None): - return self._maintain_pool_static(self._ctx, self.Process, - self._processes, - self, self._inqueue, - self._outqueue, self._initializer, - self._initargs, - self._maxtasksperchild, - self._wrap_exception, - wait_pid=wait_pid) + util.debug('added worker') @staticmethod - def _maintain_pool_static(ctx, Process, processes, poolobj, inqueue, outqueue, - initializer, initargs, maxtasksperchild, - wrap_exception, wait_pid=None): + def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, + initializer, initargs, maxtasksperchild, + wrap_exception): """Clean up any exited workers and start replacements for them. + + """ - pool = poolobj._pool - if Pool._join_exited_workers(pool, wait_pid=wait_pid): + thread = threading.current_thread() + need_repopulate = Pool._join_exited_workers(pool) + if need_repopulate: Pool._repopulate_pool_static(ctx, Process, processes, pool, inqueue, outqueue, initializer, initargs, maxtasksperchild, wrap_exception) + return need_repopulate def _setup_queues(self): self._inqueue = self._ctx.SimpleQueue() @@ -479,22 +471,31 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, return result @staticmethod - def _handle_workers(cache, taskqueue, ctx, Process, processes, poolobj, + def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, inqueue, outqueue, initializer, initargs, - maxtasksperchild, wrap_exception, repopulate_lock): + maxtasksperchild, wrap_exception): thread = threading.current_thread() util.debug('worker handler entering') # Keep maintaining workers until the cache gets drained, unless the pool - # is terminated. - while thread._state == RUN or (cache and thread._state not in (BROKEN, TERMINATE)): - Pool._maintain_pool_static(ctx, Process, processes, poolobj, inqueue, - outqueue, initializer, initargs, - maxtasksperchild, wrap_exception) + # is terminated or broken. + while thread._state == RUN or (cache and thread._state not in (TERMINATE, BROKEN)): + new_workers = Pool._maintain_pool( + ctx, Process, processes, pool, inqueue, + outqueue, initializer, initargs, + maxtasksperchild, wrap_exception) + if new_workers is None: + thread._state = BROKEN + for i, cache_ent in list(cache.items()): + err = BrokenProcessPool( + 'A worker of the pool terminated abruptly ' + 'while the child process was still executing.') + while cache_ent._number_left > 0: + # Exhaust MapResult with errors + cache_ent._set(i, (False, err)) time.sleep(0.1) # send sentinel to stop workers - with repopulate_lock: - taskqueue.put(None) + taskqueue.put(None) util.debug('worker handler exiting') @staticmethod @@ -511,9 +512,7 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache): util.debug('task handler found thread._state != RUN') break try: - util.debug('task handler adding task %d with id %d' % task[:2]) put(task) - util.debug('task handler added task %d with id %d' % task[:2]) except Exception as e: job, idx = task[:2] try: @@ -547,22 +546,11 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache): util.debug('task handler exiting') @staticmethod - def _handle_results(outqueue, get, cache, pool, repopulate_lock): - thread = threading.current_thread() - is_process = hasattr(outqueue, '_reader') + def _handle_results(outqueue, get, cache): util.debug('result handler entering') + thread = threading.current_thread() while 1: - # Determine if the Pool has been broken. - if is_process and Pool._handle_broken_pool(outqueue, pool._pool, cache): - # Mark worker handler as broken. We need this for terminate - # to succeed. - util.debug('setting all handlers to BROKEN state') - thread._state = BROKEN - pool._handle_workers._state = BROKEN - pool._handle_tasks._state = BROKEN - break - try: task = get() except (OSError, EOFError): @@ -578,14 +566,6 @@ def _handle_results(outqueue, get, cache, pool, repopulate_lock): util.debug('result handler got sentinel') break - if isinstance(task, int): - # A worker is exiting due to maxtasksperchild. Wait - # for that process to actually exit before continuing. - util.debug('waiting on PID=%d' % task) - with repopulate_lock: - pool._maintain_pool(wait_pid=task) - continue - job, i, obj = task try: cache[job]._set(i, obj) @@ -593,7 +573,7 @@ def _handle_results(outqueue, get, cache, pool, repopulate_lock): pass task = job = obj = None - while cache and thread._state not in (TERMINATE, BROKEN): + while cache and thread._state != TERMINATE: try: task = get() except (OSError, EOFError): @@ -603,8 +583,7 @@ def _handle_results(outqueue, get, cache, pool, repopulate_lock): if task is None: util.debug('result handler ignoring extra sentinel') continue - if isinstance(task, int): - continue + job, i, obj = task try: cache[job]._set(i, obj) @@ -612,7 +591,7 @@ def _handle_results(outqueue, get, cache, pool, repopulate_lock): pass task = job = obj = None - if is_process: + if hasattr(outqueue, '_reader'): util.debug('ensuring that outqueue is not full') # If we don't make room available in outqueue then # attempts to add the sentinel (None) to outqueue may @@ -628,33 +607,6 @@ def _handle_results(outqueue, get, cache, pool, repopulate_lock): util.debug('result handler exiting: len(cache)=%s, thread._state=%s', len(cache), thread._state) - @staticmethod - def _handle_broken_pool(outqueue, pool, cache): - """ Handle case where a process has unexpectedly exited. - - If a Process has exited without alerting the result_handler - thread first, we need to close/terminate the pool and abort - all running tasks. - - """ - result_reader = outqueue._reader - ready = wait([result_reader] + [p.sentinel for p in pool]) - if result_reader in ready: - return False - - terminated = [p._popen.poll() for p in pool] - if all([t in [0, None] for t in terminated]): - util.debug("worker is finished: %s" % terminated) - return False - - util.debug("pool is broken: %s" % terminated) - for i, cache_ent in list(cache.items()): - err = BrokenProcessPool( - 'A worker of the pool terminated abruptly ' - 'while the child process was still executing.') - cache_ent._set(i, (False, err)) - return True - @staticmethod def _get_tasks(func, it, size): it = iter(it) @@ -678,8 +630,7 @@ def close(self): def terminate(self): util.debug('terminating pool') self._state = TERMINATE - if self._worker_handler._state != BROKEN: - self._worker_handler._state = TERMINATE + self._worker_handler._state = TERMINATE self._terminate() def join(self): @@ -707,13 +658,13 @@ def _help_stuff_finish(inqueue, task_handler, size): def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once - util.debug('terminate pool started: should be called once at most') + util.debug('terminate pool entering') is_broken = BROKEN in (task_handler._state, worker_handler._state, result_handler._state) - task_handler._state = TERMINATE worker_handler._state = TERMINATE + task_handler._state = TERMINATE # Skip _help_finish_stuff if the pool is broken, because # the broken process may have been holding the inqueue lock. @@ -952,10 +903,6 @@ def _setup_queues(self): self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get - @staticmethod - def _handle_broken_pool(outqueue, pool, cache): - return False - @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # drain inqueue, and put sentinels at its head to make workers finish diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index db8ecc748b91a2..1038ca1734e7d2 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2623,16 +2623,30 @@ def errback(exc): def test_broken_process_pool1(self): from multiprocessing.pool import BrokenProcessPool p = multiprocessing.Pool(2) - res = p.map_async(waiting, range(3)) + res = p.map_async(waiting, range(10)) # Kill one of the pool workers. + waiting(None) pid = p._pool[0].pid os.kill(pid, signal.SIGTERM) with self.assertRaises(BrokenProcessPool): - res.get(timeout=10) + res.get() p.close() p.join() + def test_broken_process_pool2(self): + from multiprocessing.pool import BrokenProcessPool + p = multiprocessing.Pool(2) + res = p.map_async(waiting, [1]) + # Kill one of the pool workers. + pid = p._pool[0].pid + os.kill(pid, signal.SIGTERM) + with self.assertRaises(BrokenProcessPool): + res.get() + p.close() + p.join() + + def test_broken_process_pool3(self): from multiprocessing.pool import BrokenProcessPool p = multiprocessing.Pool(2) with self.assertRaises(BrokenProcessPool): From c8ba754a7cbd4a3dc66d633b963a3a9f93027172 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 9 Nov 2018 11:20:52 -0800 Subject: [PATCH 04/14] cleanup not needed imports --- Lib/multiprocessing/pool.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 7a9a454719908f..f694f13a4e8976 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -20,11 +20,9 @@ import os import time import traceback -from signal import SIGTERM, SIGINT, SIGKILL # If threading is available then ThreadPool should be provided. Therefore # we avoid top-level imports which are liable to fail on some systems. -from .connection import wait from . import util from . import get_context, TimeoutError # From b36663bf63130b77319ce3019f85448b20319157 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 9 Nov 2018 12:35:07 -0800 Subject: [PATCH 05/14] avert race condition --- Lib/multiprocessing/pool.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index f694f13a4e8976..e11fadbeb7a805 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -25,6 +25,7 @@ # we avoid top-level imports which are liable to fail on some systems. from . import util from . import get_context, TimeoutError + # # Constants representing the state of a pool # @@ -44,6 +45,7 @@ def mapstar(args): return list(map(*args)) + def starmapstar(args): return list(itertools.starmap(args[0], args[1])) @@ -235,10 +237,7 @@ def __init__(self, processes=None, initializer=None, initargs=(), def _join_exited_workers(pool): """Cleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. - - If pid is provided, explicitly call join() on the process matching that - pid. - + Returns None if the process pool is broken. """ cleaned = False broken = [] @@ -255,8 +254,6 @@ def _join_exited_workers(pool): if any(broken): # Stop all workers util.info('worker handler: process pool is broken, terminating workers...') - thread = threading.current_thread() - thread._state = BROKEN for p in pool: if p.exitcode is None: p.terminate() @@ -477,7 +474,7 @@ def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, # Keep maintaining workers until the cache gets drained, unless the pool # is terminated or broken. - while thread._state == RUN or (cache and thread._state not in (TERMINATE, BROKEN)): + while thread._state == RUN or (cache and thread._state != TERMINATE): new_workers = Pool._maintain_pool( ctx, Process, processes, pool, inqueue, outqueue, initializer, initargs, @@ -488,8 +485,8 @@ def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, err = BrokenProcessPool( 'A worker of the pool terminated abruptly ' 'while the child process was still executing.') + # Exhaust MapResult with errors while cache_ent._number_left > 0: - # Exhaust MapResult with errors cache_ent._set(i, (False, err)) time.sleep(0.1) # send sentinel to stop workers @@ -623,7 +620,9 @@ def close(self): util.debug('closing pool') if self._state == RUN: self._state = CLOSE - self._worker_handler._state = CLOSE + # Avert race condition in broken pools + if self._worker_handler._state != BROKEN: + self._worker_handler._state = CLOSE def terminate(self): util.debug('terminating pool') From f8500e22904704e0b519e2d20b4fad854f747853 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 9 Nov 2018 12:44:16 -0800 Subject: [PATCH 06/14] add documentation --- Doc/library/multiprocessing.rst | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 578b5483286af0..6e568115c881f6 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2108,6 +2108,12 @@ with the :class:`Pool` class. .. versionadded:: 3.4 *context* + .. versionchanged:: 3.8 + When one of the worker processes terminates abruptly (e.g. the + Out Of Memory Killer of linux kicked in), a :exc:`BrokenProcessPool` + error is now raised. Previously, behavior was undefined and + the :class:`Pool` or its workers would often freeze or deadlock. + .. note:: Worker processes within a :class:`Pool` typically live for the complete @@ -2225,6 +2231,12 @@ with the :class:`Pool` class. :ref:`typecontextmanager`. :meth:`~contextmanager.__enter__` returns the pool object, and :meth:`~contextmanager.__exit__` calls :meth:`terminate`. + .. exception:: BrokenProcessPool + + Derived from :exc:`RuntimeError`, this exception class is raised when + one of the workers of a :class:`Pool` has terminated in a non-clean + fashion (for example, if it was killed from the outside). + .. class:: AsyncResult From 848d3040c29217f93c81ba20a7c632b7e5e67b65 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 9 Nov 2018 12:47:14 -0800 Subject: [PATCH 07/14] make patchcheck --- Doc/library/multiprocessing.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 6e568115c881f6..6b764c2d025285 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2109,7 +2109,7 @@ with the :class:`Pool` class. *context* .. versionchanged:: 3.8 - When one of the worker processes terminates abruptly (e.g. the + When one of the worker processes terminates abruptly (e.g. the Out Of Memory Killer of linux kicked in), a :exc:`BrokenProcessPool` error is now raised. Previously, behavior was undefined and the :class:`Pool` or its workers would often freeze or deadlock. From 1f9332208372a9a792ad1d68aac3bc4ed899af93 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 9 Nov 2018 13:02:55 -0800 Subject: [PATCH 08/14] add News entry --- .../next/Library/2018-11-09-13-02-35.bpo-22393.cqalgV.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2018-11-09-13-02-35.bpo-22393.cqalgV.rst diff --git a/Misc/NEWS.d/next/Library/2018-11-09-13-02-35.bpo-22393.cqalgV.rst b/Misc/NEWS.d/next/Library/2018-11-09-13-02-35.bpo-22393.cqalgV.rst new file mode 100644 index 00000000000000..b4d107dff85f6b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-11-09-13-02-35.bpo-22393.cqalgV.rst @@ -0,0 +1,2 @@ +Fix ``multiprocessing.Pool`` indefintely hang when a worker process dies +unexpectedly. Patch by Oscar Esteban, based on code from Dan O'Reilly. From a172df6865e4964bffc6bb12dbcc7498558bcbcb Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Sun, 11 Nov 2018 17:16:29 -0800 Subject: [PATCH 09/14] stylistic fixes, avoid shadowing ``worker`` variable name --- Lib/multiprocessing/pool.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index e11fadbeb7a805..90cf774a308d46 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -144,7 +144,6 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, task = job = result = func = args = kwds = None completed += 1 - util.debug('worker exiting after %d tasks' % completed) def _helper_reraises_exception(ex): @@ -241,13 +240,12 @@ def _join_exited_workers(pool): """ cleaned = False broken = [] - for i in reversed(range(len(pool))): - worker = pool[i] - broken.append(worker.exitcode not in (None, 0)) - if worker.exitcode is not None: + for i, p in reversed(list(enumerate(pool))): + broken.append(p.exitcode not in (None, 0)) + if p.exitcode is not None: # worker exited util.debug('cleaning up worker %d' % i) - worker.join() + p.join() cleaned = True del pool[i] @@ -259,7 +257,6 @@ def _join_exited_workers(pool): p.terminate() for p in pool: p.join() - del pool[:] return None return cleaned @@ -298,8 +295,6 @@ def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, initializer, initargs, maxtasksperchild, wrap_exception): """Clean up any exited workers and start replacements for them. - - """ thread = threading.current_thread() need_repopulate = Pool._join_exited_workers(pool) @@ -473,7 +468,7 @@ def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, util.debug('worker handler entering') # Keep maintaining workers until the cache gets drained, unless the pool - # is terminated or broken. + # is terminated. while thread._state == RUN or (cache and thread._state != TERMINATE): new_workers = Pool._maintain_pool( ctx, Process, processes, pool, inqueue, @@ -578,7 +573,6 @@ def _handle_results(outqueue, get, cache): if task is None: util.debug('result handler ignoring extra sentinel') continue - job, i, obj = task try: cache[job]._set(i, obj) @@ -706,7 +700,6 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, # worker has not yet exited util.debug('cleaning up worker %d' % p.pid) p.join() - util.debug('terminate pool finalized') def __enter__(self): From 4d614b3007fb9b500c05d361724c8a5d846914b4 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Mon, 12 Nov 2018 08:31:37 -0800 Subject: [PATCH 10/14] address some of @effigies' comments --- Lib/multiprocessing/pool.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 90cf774a308d46..5a6b73ae4cf1fc 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -178,7 +178,7 @@ def __init__(self, processes=None, initializer=None, initargs=(), if processes is None: processes = os.cpu_count() or 1 if processes < 1: - raise ValueError("Number of processes must be 2 or more") + raise ValueError("Number of processes must be at least 1") if initializer is not None and not callable(initializer): raise TypeError('initializer must be a callable') @@ -296,7 +296,6 @@ def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, wrap_exception): """Clean up any exited workers and start replacements for them. """ - thread = threading.current_thread() need_repopulate = Pool._join_exited_workers(pool) if need_repopulate: Pool._repopulate_pool_static(ctx, Process, processes, pool, From 65f6eaf636310a79e83f8d567a39c169cad7f864 Mon Sep 17 00:00:00 2001 From: oesteban Date: Mon, 12 Nov 2018 12:22:36 -0800 Subject: [PATCH 11/14] protect changes of state of worker handler thread with lock --- Lib/multiprocessing/pool.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index e11fadbeb7a805..0ccfeac1de1cfc 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -470,6 +470,7 @@ def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, inqueue, outqueue, initializer, initargs, maxtasksperchild, wrap_exception): thread = threading.current_thread() + setattr(thread, '_state_lock', ctx.Lock()) util.debug('worker handler entering') # Keep maintaining workers until the cache gets drained, unless the pool @@ -480,7 +481,8 @@ def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, outqueue, initializer, initargs, maxtasksperchild, wrap_exception) if new_workers is None: - thread._state = BROKEN + with thread._state_lock: + thread._state = BROKEN for i, cache_ent in list(cache.items()): err = BrokenProcessPool( 'A worker of the pool terminated abruptly ' @@ -621,8 +623,9 @@ def close(self): if self._state == RUN: self._state = CLOSE # Avert race condition in broken pools - if self._worker_handler._state != BROKEN: - self._worker_handler._state = CLOSE + with self._worker_handler._state_lock: + if self._worker_handler._state != BROKEN: + self._worker_handler._state = CLOSE def terminate(self): util.debug('terminating pool') @@ -656,9 +659,10 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once util.debug('terminate pool entering') - is_broken = BROKEN in (task_handler._state, - worker_handler._state, - result_handler._state) + with worker_handler._state_lock: + is_broken = BROKEN in (task_handler._state, + worker_handler._state, + result_handler._state) worker_handler._state = TERMINATE task_handler._state = TERMINATE From 933c77a7dd86bf1ea1f9a63e5569ff387d8b366e Mon Sep 17 00:00:00 2001 From: oesteban Date: Mon, 17 Dec 2018 16:04:56 -0800 Subject: [PATCH 12/14] address @pitrou's comments --- Doc/library/multiprocessing.rst | 2 + Lib/multiprocessing/pool.py | 25 ++++++--- Lib/test/_test_multiprocessing.py | 86 +++++++++++++++++++++++++++---- 3 files changed, 96 insertions(+), 17 deletions(-) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 68eea2b66aabae..5752b4df1d2dc7 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2237,6 +2237,8 @@ with the :class:`Pool` class. one of the workers of a :class:`Pool` has terminated in a non-clean fashion (for example, if it was killed from the outside). + .. versionadded:: 3.8 + .. class:: AsyncResult diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index dd6a1228a83775..89ef63b85de143 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -290,12 +290,12 @@ def _maintain_pool(self): if need_repopulate is None: with self._worker_state_lock: self._worker_handler._state = BROKEN + + err = BrokenProcessPool( + 'A worker in the pool terminated abruptly.') + # Exhaust MapResult with errors for i, cache_ent in list(self._cache.items()): - err = BrokenProcessPool( - 'A worker of the pool terminated abruptly.') - # Exhaust MapResult with errors - while cache_ent._number_left > 0: - cache_ent._set(i, (False, err)) + cache_ent._set_all((False, err)) def _setup_queues(self): self._inqueue = self._ctx.SimpleQueue() @@ -679,7 +679,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, # worker has not yet exited util.debug('cleaning up worker %d' % p.pid) p.join() - util.debug('terminate pool finalized') + util.debug('terminate pool finished') def __enter__(self): self._check_running() @@ -731,6 +731,9 @@ def _set(self, i, obj): self._event.set() del self._cache[self._job] + def _set_all(self, obj): + self._set(0, obj) + AsyncResult = ApplyResult # create alias -- see #17805 # @@ -774,6 +777,12 @@ def _set(self, i, success_result): del self._cache[self._job] self._event.set() + def _set_all(self, obj): + item = 0 + while self._number_left > 0: + self._set(item, obj) + item += 1 + # # Class whose instances are returned by `Pool.imap()` # @@ -831,6 +840,10 @@ def _set(self, i, obj): if self._index == self._length: del self._cache[self._job] + def _set_all(self, obj): + while self._index != self._length: + self._set(self._index, obj) + def _set_length(self, length): with self._cond: self._length = length diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 0bd1cf37bc30b6..c72879ffe94ba3 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2584,10 +2584,13 @@ def raising(): def unpickleable_result(): return lambda: 42 -def waiting(args): - time.sleep(7) +def bad_exit_os(value): + if value: + from os import _exit as exit + # from sys import exit + exit(123) -def bad_exit(value): +def bad_exit_sys(value): if value: from sys import exit exit(123) @@ -2633,12 +2636,16 @@ def errback(exc): p.close() p.join() - def test_broken_process_pool1(self): + def test_external_signal_kills_worker_apply_async(self): + """mimics that a worker was killed from external signal""" from multiprocessing.pool import BrokenProcessPool p = multiprocessing.Pool(2) - res = p.map_async(waiting, range(10)) - # Kill one of the pool workers. - waiting(None) + res = p.apply_async(time.sleep, (5,)) + res = p.apply_async(time.sleep, (2,)) + res = p.apply_async(time.sleep, (1,)) + # Kill one of the pool workers, after some have entered + # execution (hence, the 0.5s wait) + time.sleep(0.5) pid = p._pool[0].pid os.kill(pid, signal.SIGTERM) with self.assertRaises(BrokenProcessPool): @@ -2646,11 +2653,35 @@ def test_broken_process_pool1(self): p.close() p.join() + def test_external_signal_kills_worker_imap_unordered(self): + """mimics that a worker was killed from external signal""" + from multiprocessing.pool import BrokenProcessPool + p = multiprocessing.Pool(2) + with self.assertRaises(BrokenProcessPool): + res = list(p.imap_unordered(bad_exit_os, [0, 0, 1, 0])) + p.close() + p.join() + + def test_external_signal_kills_worker_map_async1(self): + """mimics that a worker was killed from external signal""" + from multiprocessing.pool import BrokenProcessPool + p = multiprocessing.Pool(2) + res = p.map_async(time.sleep, [5] * 10) + # Kill one of the pool workers, after some have entered + # execution (hence, the 0.5s wait) + time.sleep(0.5) + pid = p._pool[0].pid + os.kill(pid, signal.SIGTERM) + with self.assertRaises(BrokenProcessPool): + res.get() + p.close() + p.join() - def test_broken_process_pool2(self): + def test_external_signal_kills_worker_map_async2(self): + """mimics that a worker was killed from external signal""" from multiprocessing.pool import BrokenProcessPool p = multiprocessing.Pool(2) - res = p.map_async(waiting, [1]) + res = p.map_async(time.sleep, (2, )) # Kill one of the pool workers. pid = p._pool[0].pid os.kill(pid, signal.SIGTERM) @@ -2659,14 +2690,47 @@ def test_broken_process_pool2(self): p.close() p.join() - def test_broken_process_pool3(self): + def test_map_async_with_broken_pool(self): + """submit task to a broken pool""" from multiprocessing.pool import BrokenProcessPool p = multiprocessing.Pool(2) + pid = p._pool[0].pid + res = p.map_async(time.sleep, (2, )) + # Kill one of the pool workers. + os.kill(pid, signal.SIGTERM) with self.assertRaises(BrokenProcessPool): - res = p.map(bad_exit, [0, 0, 1, 0]) + res.get() + p.close() + p.join() + + def test_internal_signal_kills_worker_map1(self): + from multiprocessing.pool import BrokenProcessPool + p = multiprocessing.Pool(2) + with self.assertRaises(BrokenProcessPool): + res = p.map(bad_exit_os, [0, 0, 1, 0]) p.close() p.join() + def test_internal_signal_kills_worker_map2(self): + from multiprocessing.pool import BrokenProcessPool + p = multiprocessing.Pool(2) + with self.assertRaises(BrokenProcessPool): + res = p.map(bad_exit_sys, [0, 0, 1, 0]) + p.close() + p.join() + + def test_internal_signal_kills_worker_map_async3(self): + from multiprocessing.pool import BrokenProcessPool + p = multiprocessing.Pool(2) + res = p.map_async(time.sleep, [5] * 10) + # Kill one of the pool workers, after some have entered + # execution (hence, the 0.5s wait) + time.sleep(0.5) + p._pool[0].terminate() + with self.assertRaises(BrokenProcessPool): + res.get() + p.close() + p.join() class _TestPoolWorkerLifetime(BaseTestCase): ALLOWED_TYPES = ('processes', ) From efcc185f128522d3d1e1f6e04a38d2cae46f44d1 Mon Sep 17 00:00:00 2001 From: oesteban Date: Mon, 17 Dec 2018 16:20:25 -0800 Subject: [PATCH 13/14] fix typo --- Lib/multiprocessing/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 89ef63b85de143..5e1848f2f8aadb 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -642,7 +642,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) else: - util.debug('finalizing BROKEN procress pool') + util.debug('finalishing BROKEN process pool') if (not result_handler.is_alive()) and (len(cache) != 0): raise AssertionError( From 7c21dddc0f0a5a6bd5249ba3dd8eed7342f2bf60 Mon Sep 17 00:00:00 2001 From: oesteban Date: Mon, 17 Dec 2018 16:20:37 -0800 Subject: [PATCH 14/14] fix typo (sorry for the rebound commit) --- Lib/multiprocessing/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 5e1848f2f8aadb..0cd7984455cc2c 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -642,7 +642,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) else: - util.debug('finalishing BROKEN process pool') + util.debug('finishing BROKEN process pool') if (not result_handler.is_alive()) and (len(cache) != 0): raise AssertionError(