diff --git a/docs/source/changes.md b/docs/source/changes.md index 60c38125..fc9fe890 100644 --- a/docs/source/changes.md +++ b/docs/source/changes.md @@ -20,6 +20,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and - {pull}`566` makes universal-pathlib an official dependency. - {pull}`568` restricts `task_files` to a list of patterns and raises a better error. - {pull}`569` removes the hooks related to the creation of the DAG. +- {pull}`571` removes redundant calls to `PNode.state()` which causes a high penalty for + remote files. ## 0.4.5 - 2024-01-09 diff --git a/src/_pytask/database_utils.py b/src/_pytask/database_utils.py index 0d20e0ae..dc262967 100644 --- a/src/_pytask/database_utils.py +++ b/src/_pytask/database_utils.py @@ -69,11 +69,10 @@ def update_states_in_database(session: Session, task_signature: str) -> None: _create_or_update_state(task_signature, node.signature, hash_) -def has_node_changed(task: PTask, node: PTask | PNode) -> bool: +def has_node_changed(task: PTask, node: PTask | PNode, state: str | None) -> bool: """Indicate whether a single dependency or product has changed.""" # If node does not exist, we receive None. - node_state = node.state() - if node_state is None: + if state is None: return True with DatabaseSession() as session: @@ -83,4 +82,4 @@ def has_node_changed(task: PTask, node: PTask | PNode) -> bool: if db_state is None: return True - return node_state != db_state.hash_ + return state != db_state.hash_ diff --git a/src/_pytask/execute.py b/src/_pytask/execute.py index 08c4cdaa..23329a10 100644 --- a/src/_pytask/execute.py +++ b/src/_pytask/execute.py @@ -140,7 +140,8 @@ def pytask_execute_task_setup(session: Session, task: PTask) -> None: node = dag.nodes[node_signature].get("task") or dag.nodes[ node_signature ].get("node") - if node_signature in predecessors and not node.state(): + node_state = node.state() + if node_signature in predecessors and not node_state: msg = f"{task.name!r} requires missing node {node.name!r}." if IS_FILE_SYSTEM_CASE_SENSITIVE: msg += ( @@ -149,7 +150,7 @@ def pytask_execute_task_setup(session: Session, task: PTask) -> None: ) raise NodeNotFoundError(msg) - has_changed = has_node_changed(task=task, node=node) + has_changed = has_node_changed(task=task, node=node, state=node_state) if has_changed: needs_to_be_executed = True break diff --git a/src/_pytask/nodes.py b/src/_pytask/nodes.py index e98a0f6d..80697feb 100644 --- a/src/_pytask/nodes.py +++ b/src/_pytask/nodes.py @@ -5,6 +5,7 @@ import hashlib import inspect import pickle +from contextlib import suppress from os import stat_result from pathlib import Path # noqa: TCH003 from typing import TYPE_CHECKING @@ -75,12 +76,10 @@ def signature(self) -> str: def state(self) -> str | None: """Return the state of the node.""" - try: + with suppress(OSError): source = inspect.getsource(self.function) - except OSError: - return None - else: return hashlib.sha256(source.encode()).hexdigest() + return None def execute(self, **kwargs: Any) -> Any: """Execute the task.""" @@ -137,10 +136,7 @@ def signature(self) -> str: def state(self) -> str | None: """Return the state of the node.""" - if self.path.exists(): - modification_time = self.path.stat().st_mtime - return hash_path(self.path, modification_time) - return None + return _get_state(self.path) def execute(self, **kwargs: Any) -> Any: """Execute the task.""" @@ -180,9 +176,7 @@ def state(self) -> str | None: The state is given by the modification timestamp. """ - if self.path.exists(): - return _get_state(self.path) - return None + return _get_state(self.path) def load(self, is_product: bool = False) -> Path: # noqa: ARG002 """Load the value.""" @@ -316,9 +310,7 @@ def from_path(cls, path: Path) -> PickleNode: return cls(name=path.as_posix(), path=path) def state(self) -> str | None: - if self.path.exists(): - return _get_state(self.path) - return None + return _get_state(self.path) def load(self, is_product: bool = False) -> Any: if is_product: @@ -331,15 +323,19 @@ def save(self, value: Any) -> None: pickle.dump(value, f) -def _get_state(path: Path) -> str: +def _get_state(path: Path) -> str | None: """Get state of a path. A simple function to handle local and remote files. """ - stat = path.stat() + try: + stat = path.stat() + except FileNotFoundError: + return None + if isinstance(stat, stat_result): - modification_time = path.stat().st_mtime + modification_time = stat.st_mtime return hash_path(path, modification_time) if isinstance(stat, UPathStatResult): return stat.as_info().get("ETag", "0") diff --git a/src/_pytask/persist.py b/src/_pytask/persist.py index d4a56c25..ebbfbca5 100644 --- a/src/_pytask/persist.py +++ b/src/_pytask/persist.py @@ -39,12 +39,13 @@ def pytask_execute_task_setup(session: Session, task: PTask) -> None: """ if has_mark(task, "persist"): - all_nodes_exist = all( + all_states = [ ( session.dag.nodes[name].get("task") or session.dag.nodes[name]["node"] ).state() for name in node_and_neighbors(session.dag, task.signature) - ) + ] + all_nodes_exist = all(all_states) if all_nodes_exist: any_node_changed = any( @@ -52,8 +53,11 @@ def pytask_execute_task_setup(session: Session, task: PTask) -> None: task=task, node=session.dag.nodes[name].get("task") or session.dag.nodes[name]["node"], + state=state, + ) + for name, state in zip( + node_and_neighbors(session.dag, task.signature), all_states ) - for name in node_and_neighbors(session.dag, task.signature) ) if any_node_changed: raise Persisted