Skip to content

Commit f91b4e1

Browse files
authored
Implement task priorities and the new scheduler. (#8)
1 parent 2781f71 commit f91b4e1

File tree

7 files changed

+77
-297
lines changed

7 files changed

+77
-297
lines changed

.pre-commit-config.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,18 @@ repos:
99
exclude: meta.yaml
1010
- id: debug-statements
1111
- id: end-of-file-fixer
12+
- repo: https://github.com/pre-commit/pygrep-hooks
13+
rev: v1.7.0 # Use the ref you want to point at
14+
hooks:
15+
- id: python-check-blanket-noqa
16+
- id: python-check-mock-methods
17+
- id: python-no-eval
18+
- id: python-no-log-warn
19+
- id: python-use-type-annotations
20+
- id: rst-backticks
21+
- id: rst-directive-colons
22+
- id: rst-inline-touching-normal
23+
- id: text-unicode-replacement-char
1224
- repo: https://github.com/asottile/pyupgrade
1325
rev: v2.7.4
1426
hooks:

CHANGES.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ all releases are available on `Anaconda.org
1111
------------------
1212

1313
- :gh:`5` fixes the CI and other smaller issues.
14+
- :gh:`8` aligns pytask-parallel with task priorities in pytask v0.0.11.
1415
- :gh:`9` enables --max-failures. Closes :gh:`7`.
1516

1617

README.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
pytask-parallel
1919
===============
2020

21-
Parallelize the execution of tasks with `pytask-parallel` which is a plugin for `pytask
22-
<https://github.com/pytask-dev/pytask>`_.
21+
Parallelize the execution of tasks with ``pytask-parallel`` which is a plugin for
22+
`pytask <https://github.com/pytask-dev/pytask>`_.
2323

2424

2525
Installation

src/pytask_parallel/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
"""The entry-point for pytask-parallel."""
12
__version__ = "0.0.4"

src/pytask_parallel/execute.py

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@
33
import time
44

55
import cloudpickle
6-
import networkx as nx
76
from _pytask.config import hookimpl
87
from _pytask.report import ExecutionReport
98
from pytask_parallel.backends import PARALLEL_BACKENDS
10-
from pytask_parallel.scheduler import TopologicalSorter
119

1210

1311
@hookimpl
@@ -19,23 +17,6 @@ def pytask_post_parse(config):
1917
config["pm"].register(DefaultBackendNameSpace)
2018

2119

22-
@hookimpl(tryfirst=True)
23-
def pytask_execute_create_scheduler(session):
24-
"""Create the scheduler."""
25-
if session.config["n_workers"] > 1:
26-
task_names = {task.name for task in session.tasks}
27-
task_dict = {
28-
name: nx.ancestors(session.dag, name) & task_names for name in task_names
29-
}
30-
scheduler = TopologicalSorter(task_dict)
31-
32-
# Forbid to add further nodes and check for cycles. The latter should have been
33-
# taken care of while setting up the DAG.
34-
scheduler.prepare()
35-
36-
return scheduler
37-
38-
3920
@hookimpl(tryfirst=True)
4021
def pytask_execute_build(session):
4122
"""Execute tasks with a parallel backend.
@@ -61,7 +42,12 @@ def pytask_execute_build(session):
6142
while session.scheduler.is_active():
6243

6344
newly_collected_reports = []
64-
ready_tasks = list(session.scheduler.get_ready())
45+
n_new_tasks = session.config["n_workers"] - len(running_tasks)
46+
47+
if n_new_tasks >= 1:
48+
ready_tasks = list(session.scheduler.get_ready(n_new_tasks))
49+
else:
50+
ready_tasks = []
6551

6652
for task_name in ready_tasks:
6753
task = session.dag.nodes[task_name]["task"]
@@ -132,6 +118,8 @@ def pytask_execute_build(session):
132118

133119

134120
class ProcessesNameSpace:
121+
"""The name space for hooks related to processes."""
122+
135123
@hookimpl(tryfirst=True)
136124
def pytask_execute_task(session, task): # noqa: N805
137125
"""Execute a task.
@@ -156,6 +144,8 @@ def unserialize_and_execute_task(bytes_):
156144

157145

158146
class DefaultBackendNameSpace:
147+
"""The name space for hooks related to threads."""
148+
159149
@hookimpl(tryfirst=True)
160150
def pytask_execute_task(session, task): # noqa: N805
161151
"""Execute a task.

src/pytask_parallel/scheduler.py

Lines changed: 0 additions & 271 deletions
This file was deleted.

0 commit comments

Comments
 (0)