diff --git a/executorlib/standalone/cache.py b/executorlib/standalone/cache.py index 8470e87a..fd3def78 100644 --- a/executorlib/standalone/cache.py +++ b/executorlib/standalone/cache.py @@ -28,9 +28,8 @@ def get_cache_data(cache_directory: str) -> list[dict]: file_lst = [] for task_key in os.listdir(cache_directory): - file_name = os.path.join(cache_directory, task_key, "cache.h5out") - os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) - if os.path.exists(file_name): + file_name = os.path.join(cache_directory, task_key) + if task_key[-5:] == "_o.h5": with h5py.File(file_name, "r") as hdf: file_content_dict = { key: cloudpickle.loads(np.void(hdf["/" + key])) diff --git a/executorlib/task_scheduler/file/backend.py b/executorlib/task_scheduler/file/backend.py index f5c846f8..4ea27c17 100644 --- a/executorlib/task_scheduler/file/backend.py +++ b/executorlib/task_scheduler/file/backend.py @@ -42,19 +42,19 @@ def backend_write_file(file_name: str, output: Any, runtime: float) -> None: None """ - file_name_out = os.path.splitext(file_name)[0] - os.rename(file_name, file_name_out + ".h5ready") + file_name_out = os.path.splitext(file_name)[0][:-2] + os.rename(file_name, file_name_out + "_r.h5") if "result" in output: dump( - file_name=file_name_out + ".h5ready", + file_name=file_name_out + "_r.h5", data_dict={"output": output["result"], "runtime": runtime}, ) else: dump( - file_name=file_name_out + ".h5ready", + file_name=file_name_out + "_r.h5", data_dict={"error": output["error"], "runtime": runtime}, ) - os.rename(file_name_out + ".h5ready", file_name_out + ".h5out") + os.rename(file_name_out + "_r.h5", file_name_out + "_o.h5") def backend_execute_task_in_file(file_name: str) -> None: diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 313a8b68..02ab5c0e 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -108,13 +108,9 @@ def execute_tasks_h5( resource_dict=task_resource_dict, ) if task_key not in memory_dict: - if not ( - task_key in os.listdir(cache_directory) - and "cache.h5out" - in os.listdir(os.path.join(cache_directory, task_key)) - ): - os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) - file_name = os.path.join(cache_directory, task_key, "cache.h5in") + if task_key + "_o.h5" not in os.listdir(cache_directory): + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, task_key + "_i.h5") dump(file_name=file_name, data_dict=data_dict) if not disable_dependencies: task_dependent_lst = [ @@ -138,10 +134,10 @@ def execute_tasks_h5( resource_dict=task_resource_dict, config_directory=pysqa_config_directory, backend=backend, - cache_directory=os.path.join(cache_directory, task_key), + cache_directory=cache_directory, ) file_name_dict[task_key] = os.path.join( - cache_directory, task_key, "cache.h5out" + cache_directory, task_key + "_o.h5" ) memory_dict[task_key] = task_dict["future"] future_queue.task_done() @@ -197,7 +193,7 @@ def _check_task_output( Future: The updated future object. """ - file_name = os.path.join(cache_directory, task_key, "cache.h5out") + file_name = os.path.join(cache_directory, task_key + "_o.h5") if not os.path.exists(file_name): return future_obj exec_flag, no_error_flag, result = get_output(file_name=file_name) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 3ed3bc28..cd44382c 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -151,12 +151,9 @@ def _execute_task_with_cache( fn_kwargs=task_dict["kwargs"], resource_dict=task_dict.get("resource_dict", {}), ) - os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) - file_name = os.path.join(cache_directory, task_key, "cache.h5out") - if not ( - task_key in os.listdir(cache_directory) - and "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key)) - ): + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, task_key + "_o.h5") + if task_key + "_o.h5" not in os.listdir(cache_directory): f = task_dict.pop("future") if f.set_running_or_notify_cancel(): try: diff --git a/tests/test_cache_backend_execute.py b/tests/test_cache_backend_execute.py index f780070f..b12495e8 100644 --- a/tests/test_cache_backend_execute.py +++ b/tests/test_cache_backend_execute.py @@ -35,8 +35,8 @@ def test_execute_function_mixed(self): fn_args=[1], fn_kwargs={"b": 2}, ) - file_name = os.path.join(cache_directory, task_key, "cache.h5in") - os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + file_name = os.path.join(cache_directory, task_key + "_i.h5") + os.makedirs(cache_directory, exist_ok=True) dump(file_name=file_name, data_dict=data_dict) backend_execute_task_in_file(file_name=file_name) future_obj = Future() @@ -46,11 +46,11 @@ def test_execute_function_mixed(self): self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) self.assertTrue( - get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) + get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5")) > 0.0 ) future_file_obj = FutureItem( - file_name=os.path.join(cache_directory, task_key, "cache.h5out") + file_name=os.path.join(cache_directory, task_key + "_o.h5") ) self.assertTrue(future_file_obj.done()) self.assertEqual(future_file_obj.result(), 3) @@ -63,7 +63,7 @@ def test_execute_function_args(self): fn_args=[1, 2], fn_kwargs={}, ) - file_name = os.path.join(cache_directory, task_key, "cache.h5in") + file_name = os.path.join(cache_directory, task_key + "_i.h5") os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) dump(file_name=file_name, data_dict=data_dict) backend_execute_task_in_file(file_name=file_name) @@ -74,11 +74,11 @@ def test_execute_function_args(self): self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) self.assertTrue( - get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) + get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5")) > 0.0 ) future_file_obj = FutureItem( - file_name=os.path.join(cache_directory, task_key, "cache.h5out") + file_name=os.path.join(cache_directory, task_key + "_o.h5") ) self.assertTrue(future_file_obj.done()) self.assertEqual(future_file_obj.result(), 3) @@ -91,8 +91,8 @@ def test_execute_function_kwargs(self): fn_args=[], fn_kwargs={"a": 1, "b": 2}, ) - file_name = os.path.join(cache_directory, task_key, "cache.h5in") - os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + file_name = os.path.join(cache_directory, task_key + "_i.h5") + os.makedirs(cache_directory, exist_ok=True) dump(file_name=file_name, data_dict=data_dict) backend_execute_task_in_file(file_name=file_name) future_obj = Future() @@ -102,11 +102,11 @@ def test_execute_function_kwargs(self): self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) self.assertTrue( - get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) + get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5")) > 0.0 ) future_file_obj = FutureItem( - file_name=os.path.join(cache_directory, task_key, "cache.h5out") + file_name=os.path.join(cache_directory, task_key + "_o.h5") ) self.assertTrue(future_file_obj.done()) self.assertEqual(future_file_obj.result(), 3) @@ -119,8 +119,8 @@ def test_execute_function_error(self): fn_args=[], fn_kwargs={"a": 1}, ) - file_name = os.path.join(cache_directory, task_key, "cache.h5in") - os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + file_name = os.path.join(cache_directory, task_key + "_i.h5") + os.makedirs(cache_directory, exist_ok=True) dump(file_name=file_name, data_dict=data_dict) backend_execute_task_in_file(file_name=file_name) future_obj = Future() @@ -131,11 +131,11 @@ def test_execute_function_error(self): with self.assertRaises(ValueError): future_obj.result() self.assertTrue( - get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) + get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5")) > 0.0 ) future_file_obj = FutureItem( - file_name=os.path.join(cache_directory, task_key, "cache.h5out") + file_name=os.path.join(cache_directory, task_key + "_o.h5") ) self.assertTrue(future_file_obj.done()) with self.assertRaises(ValueError):