Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import queue
import sys
import time
from asyncio.exceptions import CancelledError
from concurrent.futures import Future
from time import sleep
from typing import Any, Callable, Optional, Union
Expand Down Expand Up @@ -361,7 +362,10 @@
task_dict is not None and "fn" in task_dict and "future" in task_dict
):
future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
if len(future_lst) == 0 or ready_flag:
exception_lst = _get_exception_lst(future_lst=future_lst)
if len(exception_lst) > 0:
task_dict["future"].set_exception(exception_lst[0])
elif len(future_lst) == 0 or ready_flag:
# No future objects are used in the input or all future objects are already done
task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
args=task_dict["args"], kwargs=task_dict["kwargs"]
Expand Down Expand Up @@ -455,7 +459,10 @@
"""
wait_tmp_lst = []
for task_wait_dict in wait_lst:
if all(future.done() for future in task_wait_dict["future_lst"]):
exception_lst = _get_exception_lst(future_lst=task_wait_dict["future_lst"])
if len(exception_lst) > 0:
task_wait_dict["future"].set_exception(exception_lst[0])
elif all(future.done() for future in task_wait_dict["future_lst"]):
del task_wait_dict["future_lst"]
task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
Expand Down Expand Up @@ -663,3 +670,17 @@
future = task_dict["future"]
future.set_result(result)
future_queue.task_done()


def _get_exception_lst(future_lst: list) -> list:
def get_exception(future_obj: Future) -> bool:
try:
excp = future_obj.exception(timeout=10**-10)
return excp is not None and not isinstance(excp, CancelledError)
except TimeoutError:
return False

if sys.version_info[0] >= 3 and sys.version_info[1] >= 11:
return [f.exception() for f in future_lst if get_exception(future_obj=f)]
else:
return []

Check warning on line 686 in executorlib/interactive/shared.py

View check run for this annotation

Codecov / codecov/patch

executorlib/interactive/shared.py#L686

Added line #L686 was not covered by tests
188 changes: 183 additions & 5 deletions tests/test_dependencies_executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from concurrent.futures import Future
import unittest
import sys
from time import sleep
from queue import Queue

Expand Down Expand Up @@ -42,7 +43,7 @@ def return_input_dict(input_dict):
return input_dict


def raise_error():
def raise_error(parameter):
raise RuntimeError


Expand Down Expand Up @@ -106,6 +107,119 @@ def test_dependency_steps(self):
self.assertTrue(fs2.done())
q.put({"shutdown": True, "wait": True})

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_dependency_steps_error(self):
cloudpickle_register(ind=1)
fs1 = Future()
fs2 = Future()
q = Queue()
q.put(
{
"fn": raise_error,
"args": (),
"kwargs": {"parameter": 0},
"future": fs1,
"resource_dict": {"cores": 1},
}
)
q.put(
{
"fn": add_function,
"args": (),
"kwargs": {"parameter_1": 1, "parameter_2": fs1},
"future": fs2,
"resource_dict": {"cores": 1},
}
)
executor = create_single_node_executor(
max_workers=1,
max_cores=2,
resource_dict={
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
},
)
process = RaisingThread(
target=execute_tasks_with_dependencies,
kwargs={
"future_queue": q,
"executor_queue": executor._future_queue,
"executor": executor,
"refresh_rate": 0.01,
},
)
process.start()
self.assertFalse(fs1.done())
self.assertFalse(fs2.done())
self.assertTrue(fs1.exception() is not None)
self.assertTrue(fs2.exception() is not None)
with self.assertRaises(RuntimeError):
fs2.result()
q.put({"shutdown": True, "wait": True})

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_dependency_steps_error_before(self):
cloudpickle_register(ind=1)
fs1 = Future()
fs1.set_exception(RuntimeError())
fs2 = Future()
q = Queue()
q.put(
{
"fn": add_function,
"args": (),
"kwargs": {"parameter_1": 1, "parameter_2": 2},
"future": fs1,
"resource_dict": {"cores": 1},
}
)
q.put(
{
"fn": add_function,
"args": (),
"kwargs": {"parameter_1": 1, "parameter_2": fs1},
"future": fs2,
"resource_dict": {"cores": 1},
}
)
executor = create_single_node_executor(
max_workers=1,
max_cores=2,
resource_dict={
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
},
)
process = RaisingThread(
target=execute_tasks_with_dependencies,
kwargs={
"future_queue": q,
"executor_queue": executor._future_queue,
"executor": executor,
"refresh_rate": 0.01,
},
)
process.start()
self.assertTrue(fs1.exception() is not None)
self.assertTrue(fs2.exception() is not None)
with self.assertRaises(RuntimeError):
fs2.result()
q.put({"shutdown": True, "wait": True})

def test_many_to_one(self):
length = 5
parameter = 1
Expand Down Expand Up @@ -148,22 +262,86 @@ def test_block_allocation_false_one_worker(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)
_ = exe.submit(raise_error, parameter=0)

def test_block_allocation_true_one_worker(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)
_ = exe.submit(raise_error, parameter=0)

def test_block_allocation_false_two_workers(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)
_ = exe.submit(raise_error, parameter=0)

def test_block_allocation_true_two_workers(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)
_ = exe.submit(raise_error, parameter=0)

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_false_one_worker_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe:
cloudpickle_register(ind=1)
lst = []
for i in range(1, 4):
lst = exe.submit(
raise_error,
parameter=lst,
)
lst.result()

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_true_one_worker_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe:
cloudpickle_register(ind=1)
lst = []
for i in range(1, 4):
lst = exe.submit(
raise_error,
parameter=lst,
)
lst.result()

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_false_two_workers_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe:
cloudpickle_register(ind=1)
lst = []
for i in range(1, 4):
lst = exe.submit(
raise_error,
parameter=lst,
)
lst.result()

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_true_two_workers_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe:
cloudpickle_register(ind=1)
lst = []
for i in range(1, 4):
lst = exe.submit(
raise_error,
parameter=lst,
)
lst.result()
Loading