Skip to content
4 changes: 2 additions & 2 deletions executorlib/backend/interactive_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def main() -> None:
host=argument_dict["host"], port=argument_dict["zmqport"]
)

memory = None
memory = {"executorlib_worker_id": int(argument_dict["worker_id"])}

# required for flux interface - otherwise the current path is not included in the python path
cwd = abspath(".")
Expand Down Expand Up @@ -97,7 +97,7 @@ def main() -> None:
and "args" in input_dict
and "kwargs" in input_dict
):
memory = call_funct(input_dict=input_dict, funct=None)
memory.update(call_funct(input_dict=input_dict, funct=None, memory=memory))


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions executorlib/backend/interactive_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def main(argument_lst: Optional[list[str]] = None):
host=argument_dict["host"], port=argument_dict["zmqport"]
)

memory = None
memory = {"executorlib_worker_id": int(argument_dict["worker_id"])}

# required for flux interface - otherwise the current path is not included in the python path
cwd = abspath(".")
Expand Down Expand Up @@ -72,7 +72,7 @@ def main(argument_lst: Optional[list[str]] = None):
and "args" in input_dict
and "kwargs" in input_dict
):
memory = call_funct(input_dict=input_dict, funct=None)
memory.update(call_funct(input_dict=input_dict, funct=None, memory=memory))


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion executorlib/standalone/interactive/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ def parse_arguments(argument_lst: list[str]) -> dict:
argument_dict={
"zmqport": "--zmqport",
"host": "--host",
"worker_id": "--worker-id",
},
default_dict={"host": "localhost"},
default_dict={"host": "localhost", "worker_id": 0},
)


Expand Down
5 changes: 5 additions & 0 deletions executorlib/standalone/interactive/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def interface_bootup(
connections,
hostname_localhost: Optional[bool] = None,
log_obj_size: bool = False,
worker_id: Optional[int] = None,
) -> SocketInterface:
"""
Start interface for ZMQ communication
Expand All @@ -152,6 +153,8 @@ def interface_bootup(
this look up for security reasons. So on MacOS it is required to set this
option to true
log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects.
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
distribution.

Returns:
executorlib.shared.communication.SocketInterface: socket interface for zmq communication
Expand All @@ -165,6 +168,8 @@ def interface_bootup(
"--host",
gethostname(),
]
if worker_id is not None:
command_lst += ["--worker-id", str(worker_id)]
interface = SocketInterface(
spawner=connections,
log_obj_size=log_obj_size,
Expand Down
4 changes: 2 additions & 2 deletions executorlib/task_scheduler/interactive/blockallocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ def __init__(
process=[
Thread(
target=execute_tasks,
kwargs=executor_kwargs,
kwargs=executor_kwargs | {"worker_id": worker_id},
)
for _ in range(self._max_workers)
for worker_id in range(self._max_workers)
],
)

Expand Down
4 changes: 4 additions & 0 deletions executorlib/task_scheduler/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def execute_tasks(
queue_join_on_shutdown: bool = True,
log_obj_size: bool = False,
error_log_file: Optional[str] = None,
worker_id: Optional[int] = None,
**kwargs,
) -> None:
"""
Expand All @@ -49,6 +50,8 @@ def execute_tasks(
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
submitted to the Executor.
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
distribution.
"""
interface = interface_bootup(
command_lst=get_interactive_execute_command(
Expand All @@ -57,6 +60,7 @@ def execute_tasks(
connections=spawner(cores=cores, **kwargs),
hostname_localhost=hostname_localhost,
log_obj_size=log_obj_size,
worker_id=worker_id,
)
if init_function is not None:
interface.send_dict(
Expand Down
65 changes: 65 additions & 0 deletions tests/test_singlenodeexecutor_noblock.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
from time import sleep

from executorlib import SingleNodeExecutor
from executorlib.standalone.serialize import cloudpickle_register
Expand All @@ -12,6 +13,15 @@ def resource_dict(resource_dict):
return resource_dict


def get_worker_id(executorlib_worker_id):
sleep(0.1)
return executorlib_worker_id


def init_function():
return {"a": 1, "b": 2}


class TestExecutorBackend(unittest.TestCase):
def test_meta_executor_serial_with_dependencies(self):
with SingleNodeExecutor(
Expand Down Expand Up @@ -75,3 +85,58 @@ def test_errors(self):
block_allocation=True,
) as exe:
exe.submit(resource_dict, resource_dict={})


class TestWorkerID(unittest.TestCase):
def test_block_allocation_True(self):
with SingleNodeExecutor(
max_cores=1,
block_allocation=True,
) as exe:
worker_id = exe.submit(get_worker_id, resource_dict={}).result()
self.assertEqual(worker_id, 0)

def test_block_allocation_True_two_workers(self):
with SingleNodeExecutor(
max_cores=2,
block_allocation=True,
) as exe:
f1_worker_id = exe.submit(get_worker_id, resource_dict={})
f2_worker_id = exe.submit(get_worker_id, resource_dict={})
self.assertEqual(sum([f1_worker_id.result(), f2_worker_id.result()]), 1)

def test_init_function(self):
with SingleNodeExecutor(
max_cores=1,
block_allocation=True,
init_function=init_function,
) as exe:
worker_id = exe.submit(get_worker_id, resource_dict={}).result()
self.assertEqual(worker_id, 0)

def test_init_function_two_workers(self):
with SingleNodeExecutor(
max_cores=2,
block_allocation=True,
init_function=init_function,
) as exe:
f1_worker_id = exe.submit(get_worker_id, resource_dict={})
f2_worker_id = exe.submit(get_worker_id, resource_dict={})
self.assertEqual(sum([f1_worker_id.result(), f2_worker_id.result()]), 1)

def test_block_allocation_False(self):
with SingleNodeExecutor(
max_cores=1,
block_allocation=False,
) as exe:
worker_id = exe.submit(get_worker_id, resource_dict={}).result()
self.assertEqual(worker_id, 0)

def test_block_allocation_False_two_workers(self):
with SingleNodeExecutor(
max_cores=2,
block_allocation=False,
) as exe:
f1_worker_id = exe.submit(get_worker_id, resource_dict={})
f2_worker_id = exe.submit(get_worker_id, resource_dict={})
self.assertEqual(sum([f1_worker_id.result(), f2_worker_id.result()]), 0)
3 changes: 3 additions & 0 deletions tests/test_standalone_interactive_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class TestParser(unittest.TestCase):
def test_command_local(self):
result_dict = {
"host": "localhost",
"worker_id": 0,
"zmqport": "22",
}
command_lst = [
Expand All @@ -35,6 +36,7 @@ def test_command_local(self):
def test_command_slurm(self):
result_dict = {
"host": "127.0.0.1",
"worker_id": 0,
"zmqport": "22",
}
command_lst = [
Expand Down Expand Up @@ -76,6 +78,7 @@ def test_command_slurm(self):
def test_command_slurm_user_command(self):
result_dict = {
"host": "127.0.0.1",
"worker_id": 0,
"zmqport": "22",
}
command_lst = [
Expand Down
Loading