diff --git a/.conda/meta.yaml b/.conda/meta.yaml
index 5589bb4..27fc339 100644
--- a/.conda/meta.yaml
+++ b/.conda/meta.yaml
@@ -21,7 +21,7 @@ requirements:
run:
- python >=3.6
- cloudpickle
- - pytask >=0.0.5
+ - pytask >=0.0.6
test:
requires:
@@ -32,6 +32,8 @@ test:
commands:
- pytask --version
- pytask --help
+ - pytask clean
+ - pytask markers
- pytest tests
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 170e76e..f1d6b05 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -19,11 +19,11 @@ repos:
hooks:
- id: reorder-python-imports
- repo: https://github.com/psf/black
- rev: 19.10b0
+ rev: 20.8b1
hooks:
- id: black
- repo: https://github.com/asottile/blacken-docs
- rev: v1.7.0
+ rev: v1.8.0
hooks:
- id: blacken-docs
additional_dependencies: [black]
@@ -48,9 +48,14 @@ repos:
Pygments,
]
- repo: https://github.com/PyCQA/doc8
- rev: 0.8.1rc3
+ rev: 0.9.0a1
hooks:
- id: doc8
+- repo: https://github.com/econchick/interrogate
+ rev: 1.3.1
+ hooks:
+ - id: interrogate
+ args: [-v, --fail-under=40, src, tests]
- repo: https://github.com/codespell-project/codespell
rev: v1.17.1
hooks:
diff --git a/CHANGES.rst b/CHANGES.rst
index 0c04115..a0f5fae 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -6,11 +6,17 @@ chronological order. Releases follow `semantic versioning `
all releases are available on `Anaconda.org `_.
+0.0.3 - 2020-09-12
+------------------
+
+- :gh:`3` align the program with pytask v0.0.6.
+
+
0.0.2 - 2020-08-12
------------------
-- :gh:`2` prepares the plugin for pytask v0.0.5.
-- :gh:`3` better parsing and callbacks.
+- :gh:`1` prepares the plugin for pytask v0.0.5.
+- :gh:`2` better parsing and callbacks.
0.0.1 - 2020-07-17
diff --git a/environment.yml b/environment.yml
index dacd248..39a8790 100644
--- a/environment.yml
+++ b/environment.yml
@@ -12,7 +12,7 @@ dependencies:
- conda-verify
# Package dependencies
- - pytask >= 0.0.5
+ - pytask >= 0.0.6
- cloudpickle
# Misc
diff --git a/setup.cfg b/setup.cfg
index 2d9a01a..4e03a17 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,5 +1,5 @@
[bumpversion]
-current_version = 0.0.2
+current_version = 0.0.3
parse = (?P\d+)\.(?P\d+)(\.(?P\d+))(\-?((dev)?(?P\d+))?)
serialize =
{major}.{minor}.{patch}dev{dev}
diff --git a/setup.py b/setup.py
index 13616d4..3780757 100644
--- a/setup.py
+++ b/setup.py
@@ -3,7 +3,7 @@
setup(
name="pytask-parallel",
- version="0.0.2",
+ version="0.0.3",
packages=find_packages(where="src"),
package_dir={"": "src"},
entry_points={"pytask": ["pytask_parallel = pytask_parallel.plugin"]},
diff --git a/src/pytask_parallel/__init__.py b/src/pytask_parallel/__init__.py
index 3b93d0b..27fdca4 100644
--- a/src/pytask_parallel/__init__.py
+++ b/src/pytask_parallel/__init__.py
@@ -1 +1 @@
-__version__ = "0.0.2"
+__version__ = "0.0.3"
diff --git a/src/pytask_parallel/cli.py b/src/pytask_parallel/build.py
similarity index 73%
rename from src/pytask_parallel/cli.py
rename to src/pytask_parallel/build.py
index bb1fec8..985122e 100644
--- a/src/pytask_parallel/cli.py
+++ b/src/pytask_parallel/build.py
@@ -1,11 +1,11 @@
+"""Extend the build command."""
import click
from _pytask.config import hookimpl
-from pytask_parallel.callbacks import delay_click_callback
-from pytask_parallel.callbacks import n_workers_click_callback
@hookimpl
-def pytask_add_parameters_to_cli(command):
+def pytask_extend_command_line_interface(cli):
+ """Extend the command line interface."""
additional_parameters = [
click.Option(
["-n", "--n-workers"],
@@ -14,12 +14,13 @@ def pytask_add_parameters_to_cli(command):
"os.cpu_count() - 1. [default: 1 (no parallelization)]"
),
metavar="[INTEGER|auto]",
- callback=n_workers_click_callback,
+ default=None,
),
click.Option(
["--parallel-backend"],
type=click.Choice(["processes", "threads"]),
help="Backend for the parallelization. [default: processes]",
+ default=None,
),
click.Option(
["--delay"],
@@ -28,7 +29,7 @@ def pytask_add_parameters_to_cli(command):
"(seconds)]"
),
metavar="NUMBER > 0",
- callback=delay_click_callback,
+ default=None,
),
]
- command.params.extend(additional_parameters)
+ cli.commands["build"].params.extend(additional_parameters)
diff --git a/src/pytask_parallel/callbacks.py b/src/pytask_parallel/callbacks.py
index 8adfdb7..ce1cb5e 100644
--- a/src/pytask_parallel/callbacks.py
+++ b/src/pytask_parallel/callbacks.py
@@ -1,59 +1,42 @@
-import click
-
-
-def n_workers_click_callback(ctx, name, value): # noqa: U100
- return n_workers_callback(value)
+"""Validate command line inputs and configuration values."""
def n_workers_callback(value):
- error_occurred = False
+ """Validate the n-workers option."""
if value == "auto":
pass
- elif value is None:
- pass
+ elif value is None or value == "None":
+ value = None
elif isinstance(value, int) and 1 <= value:
pass
+ elif isinstance(value, str) and value.isdigit():
+ value = int(value)
else:
- try:
- value = int(value)
- except ValueError:
- error_occurred = True
- else:
- if value < 1:
- error_occurred = True
-
- if error_occurred:
- raise click.UsageError("n-processes can either be an integer >= 1 or 'auto'.")
+ raise ValueError("n_processes can either be an integer >= 1, 'auto' or None.")
return value
def parallel_backend_callback(value):
+ """Validate the input for the parallel backend."""
+ if value == "None":
+ value = None
if value not in ["processes", "threads", None]:
- raise click.UsageError("parallel_backend has to be 'processes' or 'threads'.")
+ raise ValueError("parallel_backend has to be 'processes' or 'threads'.")
return value
-def delay_click_callback(ctx, name, value): # noqa: U100
- return delay_callback(value)
-
-
def delay_callback(value):
- error_occurred = False
- if isinstance(value, float) and 0 < value:
- pass
- elif value is None:
- pass
+ """Validate the delay option."""
+ if value is None or value == "None":
+ value = None
else:
try:
value = float(value)
except ValueError:
- error_occurred = True
- else:
- if value < 0:
- error_occurred = True
+ pass
- if error_occurred:
- raise click.UsageError("delay has to be a number greater than 0.")
+ if not (isinstance(value, float) and value > 0):
+ raise ValueError("delay has to be a number greater than 0.")
return value
diff --git a/src/pytask_parallel/config.py b/src/pytask_parallel/config.py
index e06050f..6e64004 100644
--- a/src/pytask_parallel/config.py
+++ b/src/pytask_parallel/config.py
@@ -1,14 +1,17 @@
+"""Configure pytask."""
import os
from _pytask.config import hookimpl
-from _pytask.shared import get_first_not_none_value
+from _pytask.shared import get_first_non_none_value
+from pytask_parallel.callbacks import delay_callback
from pytask_parallel.callbacks import n_workers_callback
from pytask_parallel.callbacks import parallel_backend_callback
@hookimpl
def pytask_parse_config(config, config_from_cli, config_from_file):
- config["n_workers"] = get_first_not_none_value(
+ """Parse the configuration."""
+ config["n_workers"] = get_first_non_none_value(
config_from_cli,
config_from_file,
key="n_workers",
@@ -18,11 +21,15 @@ def pytask_parse_config(config, config_from_cli, config_from_file):
if config["n_workers"] == "auto":
config["n_workers"] = max(os.cpu_count() - 1, 1)
- config["delay"] = get_first_not_none_value(
- config_from_cli, config_from_file, key="delay", default=0.1, callback=float
+ config["delay"] = get_first_non_none_value(
+ config_from_cli,
+ config_from_file,
+ key="delay",
+ default=0.1,
+ callback=delay_callback,
)
- config["parallel_backend"] = get_first_not_none_value(
+ config["parallel_backend"] = get_first_non_none_value(
config_from_cli,
config_from_file,
key="parallel_backend",
diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py
index 57123e8..0efbf27 100644
--- a/src/pytask_parallel/execute.py
+++ b/src/pytask_parallel/execute.py
@@ -17,6 +17,7 @@
@hookimpl
def pytask_post_parse(config):
+ """Register the parallel backend."""
if config["parallel_backend"] == "processes":
config["pm"].register(ProcessesNameSpace)
elif config["parallel_backend"] == "threads":
@@ -25,6 +26,7 @@ def pytask_post_parse(config):
@hookimpl(tryfirst=True)
def pytask_execute_create_scheduler(session):
+ """Create the scheduler."""
if session.config["n_workers"] > 1:
task_names = {task.name for task in session.tasks}
task_dict = {
@@ -41,6 +43,16 @@ def pytask_execute_create_scheduler(session):
@hookimpl(tryfirst=True)
def pytask_execute_build(session):
+ """Execute tasks with a parallel backend.
+
+ There are three phases while the scheduler has tasks which need to be executed.
+
+ 1. Take all ready tasks, set up their execution and submit them.
+ 2. For all tasks which are running, find those which have finished and turn them
+ into a report.
+ 3. Process all reports and report the result on the command line.
+
+ """
if session.config["n_workers"] > 1:
reports = []
running_tasks = {}
@@ -124,12 +136,23 @@ def pytask_execute_build(session):
class ProcessesNameSpace:
@hookimpl(tryfirst=True)
def pytask_execute_task(session, task): # noqa: N805
+ """Execute a task.
+
+ Take a task, pickle it and send the bytes over to another process.
+
+ """
if session.config["n_workers"] > 1:
bytes_ = cloudpickle.dumps(task)
return session.executor.submit(unserialize_and_execute_task, bytes_)
def unserialize_and_execute_task(bytes_):
+ """Unserialize and execute task.
+
+ This function receives bytes and unpickles them to a task which is them execute in a
+ spawned process or thread.
+
+ """
task = cloudpickle.loads(bytes_)
task.execute()
@@ -137,5 +160,11 @@ def unserialize_and_execute_task(bytes_):
class ThreadsNameSpace:
@hookimpl(tryfirst=True)
def pytask_execute_task(session, task): # noqa: N805
+ """Execute a task.
+
+ Since threads share their memory, it is not necessary to pickle and unpickle the
+ task.
+
+ """
if session.config["n_workers"] > 1:
return session.executor.submit(task.execute)
diff --git a/src/pytask_parallel/plugin.py b/src/pytask_parallel/plugin.py
index 878353c..eb1f779 100644
--- a/src/pytask_parallel/plugin.py
+++ b/src/pytask_parallel/plugin.py
@@ -1,11 +1,13 @@
+"""Entry-point for the plugin."""
from _pytask.config import hookimpl
-from pytask_parallel import cli
+from pytask_parallel import build
from pytask_parallel import config
from pytask_parallel import execute
@hookimpl
def pytask_add_hooks(pm):
- pm.register(cli)
+ """Register plugins."""
+ pm.register(build)
pm.register(config)
pm.register(execute)
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..3a17662
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,7 @@
+import pytest
+from click.testing import CliRunner
+
+
+@pytest.fixture()
+def runner():
+ return CliRunner()
diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py
index 2b2fc7e..7baf356 100644
--- a/tests/test_callbacks.py
+++ b/tests/test_callbacks.py
@@ -1,36 +1,29 @@
-import functools
from contextlib import ExitStack as does_not_raise # noqa: N813
-import click
import pytest
from pytask_parallel.callbacks import delay_callback
-from pytask_parallel.callbacks import delay_click_callback
from pytask_parallel.callbacks import n_workers_callback
-from pytask_parallel.callbacks import n_workers_click_callback
from pytask_parallel.callbacks import parallel_backend_callback
-partialed_n_workers_callback = functools.partial(
- n_workers_click_callback, ctx=None, name=None
-)
-
-
@pytest.mark.unit
@pytest.mark.parametrize(
"value, expectation",
[
- (0, pytest.raises(click.UsageError)),
+ (0, pytest.raises(ValueError)),
(1, does_not_raise()),
(2, does_not_raise()),
("auto", does_not_raise()),
- ("asdad", pytest.raises(click.UsageError)),
+ ("asdad", pytest.raises(ValueError)),
(None, does_not_raise()),
+ ("None", does_not_raise()),
+ ("1", does_not_raise()),
+ ("1.1", pytest.raises(ValueError)),
],
)
-@pytest.mark.parametrize("func", [n_workers_callback, partialed_n_workers_callback])
-def test_n_workers_callback(func, value, expectation):
+def test_n_workers_callback(value, expectation):
with expectation:
- func(value=value)
+ n_workers_callback(value)
@pytest.mark.unit
@@ -39,9 +32,10 @@ def test_n_workers_callback(func, value, expectation):
[
("threads", does_not_raise()),
("processes", does_not_raise()),
- (1, pytest.raises(click.UsageError)),
- ("asdad", pytest.raises(click.UsageError)),
+ (1, pytest.raises(ValueError)),
+ ("asdad", pytest.raises(ValueError)),
(None, does_not_raise()),
+ ("None", does_not_raise()),
],
)
def test_parallel_backend_callback(value, expectation):
@@ -49,21 +43,18 @@ def test_parallel_backend_callback(value, expectation):
parallel_backend_callback(value)
-partialed_delay_callback = functools.partial(delay_click_callback, ctx=None, name=None)
-
-
@pytest.mark.unit
@pytest.mark.parametrize(
"value, expectation",
[
- (-1, pytest.raises(click.UsageError)),
+ (-1, pytest.raises(ValueError)),
(0.1, does_not_raise()),
(1, does_not_raise()),
- ("asdad", pytest.raises(click.UsageError)),
+ ("asdad", pytest.raises(ValueError)),
(None, does_not_raise()),
+ ("None", does_not_raise()),
],
)
-@pytest.mark.parametrize("func", [delay_callback, partialed_delay_callback])
-def test_delay_callback(func, value, expectation):
+def test_delay_callback(value, expectation):
with expectation:
- func(value=value)
+ delay_callback(value)
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 26f90aa..c184155 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -1,11 +1,12 @@
-import subprocess
import textwrap
+from time import time
import pytest
+from pytask import cli
@pytest.mark.end_to_end
-def test_delay_via_cli(tmp_path):
+def test_delay_via_cli(runner, tmp_path):
source = """
import pytask
@@ -15,4 +16,9 @@ def task_1(produces):
"""
tmp_path.joinpath("task_dummy.py").write_text(textwrap.dedent(source))
- subprocess.run(["pytask", tmp_path.as_posix(), "-n", "2", "--delay", "5"])
+ start = time()
+ result = runner.invoke(cli, [tmp_path.as_posix(), "-n", "2", "--delay", "5"])
+ end = time()
+
+ assert result.exit_code == 0
+ assert end - start > 5
diff --git a/tests/test_config.py b/tests/test_config.py
index e1180c8..f8fdf59 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -24,31 +24,29 @@ def test_interplay_between_debugging_and_parallel(tmp_path, pdb, n_workers, expe
@pytest.mark.end_to_end
@pytest.mark.parametrize("config_file", ["pytask.ini", "tox.ini", "setup.cfg"])
@pytest.mark.parametrize(
- "name, value",
+ "name, value, exit_code",
[
- ("n_workers", "auto"),
- ("n_workers", 1),
- ("n_workers", 2),
- ("delay", 0.1),
- ("delay", 1),
- ("parallel_backend", "threads"),
- ("parallel_backend", "processes"),
- ("parallel_backend", "unknown_backend"),
+ ("n_workers", "auto", 0),
+ ("n_workers", 1, 0),
+ ("n_workers", 2, 0),
+ ("delay", 0.1, 0),
+ ("delay", 1, 0),
+ ("parallel_backend", "threads", 0),
+ ("parallel_backend", "processes", 0),
+ ("parallel_backend", "unknown_backend", 2),
],
)
-def test_reading_values_from_config_file(tmp_path, config_file, name, value):
+def test_reading_values_from_config_file(tmp_path, config_file, name, value, exit_code):
config = f"""
[pytask]
{name} = {value}
"""
tmp_path.joinpath(config_file).write_text(textwrap.dedent(config))
- try:
- session = main({"paths": tmp_path})
- except Exception as e:
- assert "Error while configuring pytask" in str(e)
- else:
- assert session.exit_code == 0
- if value == "auto":
- value = os.cpu_count() - 1
+ session = main({"paths": tmp_path})
+
+ assert session.exit_code == exit_code
+ if value == "auto":
+ value = os.cpu_count() - 1
+ if session.exit_code == 0:
assert session.config[name] == value
diff --git a/tests/test_execute.py b/tests/test_execute.py
index 7eeb836..4b0ab14 100644
--- a/tests/test_execute.py
+++ b/tests/test_execute.py
@@ -2,8 +2,10 @@
import textwrap
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
+from time import time
import pytest
+from pytask import cli
from pytask import main
from pytask_parallel.execute import ProcessesNameSpace
from pytask_parallel.execute import ThreadsNameSpace
@@ -42,6 +44,7 @@ def task_2(produces):
session = main({"paths": tmp_path})
+ assert session.exit_code == 0
assert session.execution_end - session.execution_start > 10
tmp_path.joinpath("out_1.txt").unlink()
@@ -51,7 +54,54 @@ def task_2(produces):
{"paths": tmp_path, "n_workers": 2, "parallel_backend": parallel_backend}
)
- assert session.execution_end - session.execution_start < 12
+ assert session.exit_code == 0
+ assert session.execution_end - session.execution_start < 10
+
+
+@pytest.mark.end_to_end
+@pytest.mark.parametrize("parallel_backend", ["processes", "threads"])
+def test_parallel_execution_speedup_w_cli(runner, tmp_path, parallel_backend):
+ source = """
+ import pytask
+ import time
+
+ @pytask.mark.produces("out_1.txt")
+ def task_1(produces):
+ time.sleep(5)
+ produces.write_text("1")
+
+ @pytask.mark.produces("out_2.txt")
+ def task_2(produces):
+ time.sleep(5)
+ produces.write_text("2")
+ """
+ tmp_path.joinpath("task_dummy.py").write_text(textwrap.dedent(source))
+
+ start = time()
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ end = time()
+
+ assert result.exit_code == 0
+ assert end - start > 10
+
+ tmp_path.joinpath("out_1.txt").unlink()
+ tmp_path.joinpath("out_2.txt").unlink()
+
+ start = time()
+ result = runner.invoke(
+ cli,
+ [
+ tmp_path.as_posix(),
+ "--n-workers",
+ "2",
+ "--parallel-backend",
+ parallel_backend,
+ ],
+ )
+ end = time()
+
+ assert result.exit_code == 0
+ assert end - start < 10
@pytest.mark.integration
diff --git a/tox.ini b/tox.ini
index f2619ba..6dcbc15 100644
--- a/tox.ini
+++ b/tox.ini
@@ -9,7 +9,7 @@ basepython = python
[testenv:pytest]
conda_deps =
cloudpickle
- pytask >=0.0.5
+ pytask >=0.0.6
pytest
pytest-cov
pytest-xdist
@@ -38,6 +38,7 @@ ignore =
max-line-length = 88
warn-symbols =
pytest.mark.wip = Remove 'wip' mark for tests.
+ pytest.mark.skip = Remove 'skip' flag for tests.
[pytest]
addopts = --doctest-modules
@@ -57,6 +58,3 @@ markers =
norecursedirs =
.idea
.tox
-warn-symbols =
- pytest.mark.wip = Remove 'wip' flag for tests.
- pytest.mark.skip = Remove 'skip' flag for tests.