Skip to content

Remove redundant calls of PNode.state(). #571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
- {pull}`566` makes universal-pathlib an official dependency.
- {pull}`568` restricts `task_files` to a list of patterns and raises a better error.
- {pull}`569` removes the hooks related to the creation of the DAG.
- {pull}`571` removes redundant calls to `PNode.state()` which causes a high penalty for
remote files.

## 0.4.5 - 2024-01-09

Expand Down
7 changes: 3 additions & 4 deletions src/_pytask/database_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ def update_states_in_database(session: Session, task_signature: str) -> None:
_create_or_update_state(task_signature, node.signature, hash_)


def has_node_changed(task: PTask, node: PTask | PNode) -> bool:
def has_node_changed(task: PTask, node: PTask | PNode, state: str | None) -> bool:
"""Indicate whether a single dependency or product has changed."""
# If node does not exist, we receive None.
node_state = node.state()
if node_state is None:
if state is None:
return True

with DatabaseSession() as session:
Expand All @@ -83,4 +82,4 @@ def has_node_changed(task: PTask, node: PTask | PNode) -> bool:
if db_state is None:
return True

return node_state != db_state.hash_
return state != db_state.hash_
5 changes: 3 additions & 2 deletions src/_pytask/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ def pytask_execute_task_setup(session: Session, task: PTask) -> None:
node = dag.nodes[node_signature].get("task") or dag.nodes[
node_signature
].get("node")
if node_signature in predecessors and not node.state():
node_state = node.state()
if node_signature in predecessors and not node_state:
msg = f"{task.name!r} requires missing node {node.name!r}."
if IS_FILE_SYSTEM_CASE_SENSITIVE:
msg += (
Expand All @@ -149,7 +150,7 @@ def pytask_execute_task_setup(session: Session, task: PTask) -> None:
)
raise NodeNotFoundError(msg)

has_changed = has_node_changed(task=task, node=node)
has_changed = has_node_changed(task=task, node=node, state=node_state)
if has_changed:
needs_to_be_executed = True
break
Expand Down
30 changes: 13 additions & 17 deletions src/_pytask/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import hashlib
import inspect
import pickle
from contextlib import suppress
from os import stat_result
from pathlib import Path # noqa: TCH003
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -75,12 +76,10 @@ def signature(self) -> str:

def state(self) -> str | None:
"""Return the state of the node."""
try:
with suppress(OSError):
source = inspect.getsource(self.function)
except OSError:
return None
else:
return hashlib.sha256(source.encode()).hexdigest()
return None

def execute(self, **kwargs: Any) -> Any:
"""Execute the task."""
Expand Down Expand Up @@ -137,10 +136,7 @@ def signature(self) -> str:

def state(self) -> str | None:
"""Return the state of the node."""
if self.path.exists():
modification_time = self.path.stat().st_mtime
return hash_path(self.path, modification_time)
return None
return _get_state(self.path)

def execute(self, **kwargs: Any) -> Any:
"""Execute the task."""
Expand Down Expand Up @@ -180,9 +176,7 @@ def state(self) -> str | None:
The state is given by the modification timestamp.

"""
if self.path.exists():
return _get_state(self.path)
return None
return _get_state(self.path)

def load(self, is_product: bool = False) -> Path: # noqa: ARG002
"""Load the value."""
Expand Down Expand Up @@ -316,9 +310,7 @@ def from_path(cls, path: Path) -> PickleNode:
return cls(name=path.as_posix(), path=path)

def state(self) -> str | None:
if self.path.exists():
return _get_state(self.path)
return None
return _get_state(self.path)

def load(self, is_product: bool = False) -> Any:
if is_product:
Expand All @@ -331,15 +323,19 @@ def save(self, value: Any) -> None:
pickle.dump(value, f)


def _get_state(path: Path) -> str:
def _get_state(path: Path) -> str | None:
"""Get state of a path.

A simple function to handle local and remote files.

"""
stat = path.stat()
try:
stat = path.stat()
except FileNotFoundError:
return None

if isinstance(stat, stat_result):
modification_time = path.stat().st_mtime
modification_time = stat.st_mtime
return hash_path(path, modification_time)
if isinstance(stat, UPathStatResult):
return stat.as_info().get("ETag", "0")
Expand Down
10 changes: 7 additions & 3 deletions src/_pytask/persist.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,25 @@ def pytask_execute_task_setup(session: Session, task: PTask) -> None:

"""
if has_mark(task, "persist"):
all_nodes_exist = all(
all_states = [
(
session.dag.nodes[name].get("task") or session.dag.nodes[name]["node"]
).state()
for name in node_and_neighbors(session.dag, task.signature)
)
]
all_nodes_exist = all(all_states)

if all_nodes_exist:
any_node_changed = any(
has_node_changed(
task=task,
node=session.dag.nodes[name].get("task")
or session.dag.nodes[name]["node"],
state=state,
)
for name, state in zip(
node_and_neighbors(session.dag, task.signature), all_states
)
for name in node_and_neighbors(session.dag, task.signature)
)
if any_node_changed:
raise Persisted
Expand Down