Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions executorlib/task_scheduler/file/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ def execute_tasks_h5(
with contextlib.suppress(queue.Empty):
task_dict = future_queue.get_nowait()
if task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]:
while len(memory_dict) > 0:
memory_dict = {
key: _check_task_output(
task_key=key,
future_obj=value,
cache_directory=cache_dir_dict[key],
)
for key, value in memory_dict.items()
if not value.done()
}
Comment on lines +89 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add timeout and sleep to prevent busy waiting and infinite loops.

The implementation correctly ensures all pending futures complete before shutdown, but the busy-wait loop could consume unnecessary CPU resources and potentially run indefinitely if futures never complete.

Consider adding a timeout mechanism and sleep interval to improve performance and reliability:

+import time
+
 while len(memory_dict) > 0:
+    time.sleep(0.1)  # Add small delay to prevent busy waiting
     memory_dict = {
         key: _check_task_output(
             task_key=key,
             future_obj=value,
             cache_directory=cache_dir_dict[key],
         )
         for key, value in memory_dict.items()
         if not value.done()
     }
+    # Optional: Add timeout to prevent infinite loops
+    # if time.time() - start_time > max_wait_time:
+    #     break

This prevents excessive CPU usage while waiting for futures to complete and provides an escape mechanism for stuck futures.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while len(memory_dict) > 0:
memory_dict = {
key: _check_task_output(
task_key=key,
future_obj=value,
cache_directory=cache_dir_dict[key],
)
for key, value in memory_dict.items()
if not value.done()
}
import time
while len(memory_dict) > 0:
time.sleep(0.1) # Add small delay to prevent busy waiting
memory_dict = {
key: _check_task_output(
task_key=key,
future_obj=value,
cache_directory=cache_dir_dict[key],
)
for key, value in memory_dict.items()
if not value.done()
}
# Optional: Add timeout to prevent infinite loops
# if time.time() - start_time > max_wait_time:
# break
🤖 Prompt for AI Agents
In executorlib/task_scheduler/file/shared.py around lines 89 to 98, the current
loop busy-waits while checking for incomplete futures, which can cause high CPU
usage and potential infinite loops. To fix this, add a sleep interval inside the
loop to pause briefly between checks, reducing CPU consumption. Additionally,
implement a timeout mechanism that breaks the loop if futures do not complete
within a reasonable time, preventing infinite waiting. Use time tracking to
enforce the timeout and ensure the loop exits gracefully.

if terminate_function is not None:
for task in process_dict.values():
terminate_function(task=task)
Expand Down
Loading