diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index ec6766c4..4068b84d 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -139,6 +139,7 @@ def __init__( super().__init__(max_cores=executor_kwargs.get("max_cores", None)) executor_kwargs["future_queue"] = self._future_queue executor_kwargs["spawner"] = spawner + executor_kwargs["queue_join_on_shutdown"] = False self._set_process( process=[ RaisingThread( @@ -209,6 +210,7 @@ def execute_parallel_tasks( hostname_localhost: Optional[bool] = None, init_function: Optional[Callable] = None, cache_directory: Optional[str] = None, + queue_join_on_shutdown: bool = True, **kwargs, ) -> None: """ @@ -227,6 +229,7 @@ def execute_parallel_tasks( option to true init_function (Callable): optional function to preset arguments for functions which are submitted later cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. """ interface = interface_bootup( command_lst=_get_backend_path( @@ -244,7 +247,8 @@ def execute_parallel_tasks( if "shutdown" in task_dict.keys() and task_dict["shutdown"]: interface.shutdown(wait=task_dict["wait"]) future_queue.task_done() - future_queue.join() + if queue_join_on_shutdown: + future_queue.join() break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): if cache_directory is None: diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index 5c03b49a..5e5949cf 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -40,6 +40,10 @@ def merge(lst): return sum(lst) +def raise_error(): + raise RuntimeError + + class TestExecutorWithDependencies(unittest.TestCase): def test_executor(self): with Executor(max_cores=1, backend="local") as exe: @@ -227,3 +231,29 @@ def test_many_to_one_plot(self): ) self.assertEqual(len(nodes), 18) self.assertEqual(len(edges), 21) + + +class TestExecutorErrors(unittest.TestCase): + def test_block_allocation_false_one_worker(self): + with self.assertRaises(RuntimeError): + with Executor(max_cores=1, backend="local", block_allocation=False) as exe: + cloudpickle_register(ind=1) + _ = exe.submit(raise_error) + + def test_block_allocation_true_one_worker(self): + with self.assertRaises(RuntimeError): + with Executor(max_cores=1, backend="local", block_allocation=True) as exe: + cloudpickle_register(ind=1) + _ = exe.submit(raise_error) + + def test_block_allocation_false_two_workers(self): + with self.assertRaises(RuntimeError): + with Executor(max_cores=2, backend="local", block_allocation=False) as exe: + cloudpickle_register(ind=1) + _ = exe.submit(raise_error) + + def test_block_allocation_true_two_workers(self): + with self.assertRaises(RuntimeError): + with Executor(max_cores=2, backend="local", block_allocation=True) as exe: + cloudpickle_register(ind=1) + _ = exe.submit(raise_error)