-
Notifications
You must be signed in to change notification settings - Fork 11
ENH: Allow registering a different pickling backend #669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
As a quick fix, you can create your own If so, adding serializer and deserializer functions as attributes on the |
Thanks for the quick reply and idea! Apologies for the slow response. I actually did the copy & paste version instead of trying to subclass. Can confirm that adding this code: import hashlib
import cloudpickle
from attrs import define
from os import stat_result
from _pytask.path import hash_path
from pytask import PPathNode, hash_value
from upath import UPath
from upath._stat import UPathStatResult
@define
class CloudpickleNode(PPathNode):
"""A node for pickle files.
Attributes:
----------
name
Name of the node which makes it identifiable in the DAG.
path
The path to the file.
"""
path: Path
name: str = ""
@property
def signature(self) -> str:
"""The unique signature of the node."""
raw_key = str(hash_value(self.path))
return hashlib.sha256(raw_key.encode()).hexdigest()
@classmethod
def from_path(cls, path: Path) -> CloudpickleNode:
"""Instantiate class from path to file."""
if not path.is_absolute():
msg = "Node must be instantiated from absolute path."
raise ValueError(msg)
return cls(name=path.as_posix(), path=path)
def state(self) -> str | None:
return _get_state(self.path)
def load(self, is_product: bool = False) -> Any:
if is_product:
return self
with self.path.open("rb") as f:
return cloudpickle.load(f)
def save(self, value: Any) -> None:
with self.path.open("wb") as f:
cloudpickle.dump(value, f)
def _get_state(path: Path) -> str | None:
"""Get state of a path.
A simple function to handle local and remote files.
"""
# Invalidate the cache of the path if it is a UPath because it might have changed in
# a different process with pytask-parallel and the main process does not know about
# it and relies on the cache.
if isinstance(path, UPath):
path.fs.invalidate_cache()
try:
stat = path.stat()
except FileNotFoundError:
return None
if isinstance(stat, stat_result):
modification_time = stat.st_mtime
return hash_path(path, modification_time)
if isinstance(stat, UPathStatResult):
return stat.as_info().get("ETag", "0")
msg = "Unknown stat object."
raise NotImplementedError(msg) and changing the def task(...) -> Annotated[CloudpickleNode, data_catalog["parameters_for_estimation"]]:
...
return em.estimate_msm(...) works! The only changes to PickleNode are the two lines with |
Is your feature request related to a problem?
Running pytask in a project with optimagic & LCM left a colleague baffled by the error reproduced below. For debugging it, some developer (well, reviewer) knowledge of the three libraries was certainly helpful.
Reason is that these libraries require pickling functions that the built-in
pickle
cannot handle, so optimagic uses cloudpickle internally. So it is possible to work around this by doing the pickling / unpickling manually, but it would be nice to just continue using the pytask data catalogue abstraction directly.Describe the solution you'd like
Some stable mechanism to specify the backend to in
pytask.PNode.load
/.save
.I guess subclassing would do the trick, but maybe it is common enough to support it project-wide?
The text was updated successfully, but these errors were encountered: