Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions executorlib/task_scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from concurrent.futures import (
Future,
)
from concurrent.futures import (
wait as wait_for_futures,
)
from threading import Thread
from typing import Callable, Optional, Union

Expand All @@ -31,6 +34,7 @@
self._max_cores = max_cores
self._future_queue: Optional[queue.Queue] = queue.Queue()
self._process: Optional[Union[Thread, list[Thread]]] = None
self._futures: set[Future] = set()

@property
def max_workers(self) -> Optional[int]:
Expand Down Expand Up @@ -124,6 +128,7 @@
"resource_dict": resource_dict,
}
)
self._futures.add(f)
return f

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
Expand All @@ -143,11 +148,17 @@
"""
if cancel_futures and self._future_queue is not None:
cancel_items_in_queue(que=self._future_queue)
if cancel_futures:
for f in self._futures:
f.cancel()

Check warning on line 153 in executorlib/task_scheduler/base.py

View check run for this annotation

Codecov / codecov/patch

executorlib/task_scheduler/base.py#L152-L153

Added lines #L152 - L153 were not covered by tests
if wait:
wait_for_futures(self._futures)
if self._process is not None and self._future_queue is not None:
self._future_queue.put({"shutdown": True, "wait": wait})
if wait and isinstance(self._process, Thread):
self._process.join()
self._future_queue.join()
self._futures.clear()
self._process = None
self._future_queue = None

Expand Down
Loading