From 235fffc0be93ac912fbec538dc59e35c87909294 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Wed, 6 Aug 2025 12:15:25 +0200 Subject: [PATCH 01/25] wip --- dataframely/_serialization.py | 81 ++++++++++++++++++++++++++++++++++- dataframely/schema.py | 71 ++++++++++++++++++++---------- 2 files changed, 128 insertions(+), 24 deletions(-) diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index d17bb63e..570ebeab 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -1,9 +1,9 @@ # Copyright (c) QuantCo 2025-2025 # SPDX-License-Identifier: BSD-3-Clause - import base64 import datetime as dt import decimal +from abc import ABC, abstractmethod from io import BytesIO from json import JSONDecoder, JSONEncoder from typing import Any, cast @@ -112,3 +112,82 @@ def object_hook(self, dct: dict[str, Any]) -> Any: ) case _: raise TypeError(f"Unknown type '{dct['__type__']}' in JSON data.") + + +SerializedSchema = str + + +class DataFramelyIO(ABC): + # --------------------------- Individual tables ------------------------------------ + @abstractmethod + def sink_table( + self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any + ) -> None: ... + + @abstractmethod + def write_table( + self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any + ) -> None: ... + + @abstractmethod + def scan_table( + self, **kwargs: Any + ) -> tuple[pl.LazyFrame, SerializedSchema | None]: ... + + @abstractmethod + def read_table( + self, **kwargs: Any + ) -> tuple[pl.DataFrame, SerializedSchema | None]: ... + + # # --------------------------- Collections ------------------------------------ + # @abstractmethod + # def sink_collection(self, dfs: dict[str, pl.LazyFrame], collection_metadata: MetaData, table_metadata: dict[str, MetaData], *args: WriteArgs.args, **kwargs: WriteArgs.kwargs) -> None: + # ... + # + # @abstractmethod + # def write_collection(self, dfs: dict[str, pl.DataFrame], collection_metadata: MetaData, table_metadata: dict[str, MetaData], *args: WriteArgs.args, **kwargs: WriteArgs.kwargs) -> None: + # ... + # + # @abstractmethod + # def scan_collection(self, *args: ReadArgs.args, **kwargs: ReadArgs.kwargs) -> tuple[dict[str, pl.LazyFrame], MetaData]: + # ... + # + # @abstractmethod + # def read_collection(self, *args: ReadArgs.args, **kwargs: ReadArgs.kwargs) -> tuple[dict[str, pl.DataFrame], MetaData]: + # ... + + +class ParquetIO(DataFramelyIO): + def sink_table( + self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any + ) -> None: + file = kwargs.pop("file") + metadata = kwargs.pop("metadata", {}) + lf.sink_parquet( + file, + metadata={**metadata, SCHEMA_METADATA_KEY: serialized_schema}, + **kwargs, + ) + + def write_table( + self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any + ) -> None: + file = kwargs.pop("file") + metadata = kwargs.pop("metadata", {}) + df.write_parquet( + file, + metadata={**metadata, SCHEMA_METADATA_KEY: serialized_schema}, + **kwargs, + ) + + def scan_table(self, **kwargs: Any) -> tuple[pl.LazyFrame, SerializedSchema | None]: + source = kwargs.pop("source") + lf = pl.scan_parquet(source, **kwargs) + metadata = pl.read_parquet_metadata(source).get(SCHEMA_METADATA_KEY) + return lf, metadata + + def read_table(self, **kwargs: Any) -> tuple[pl.DataFrame, SerializedSchema | None]: + source = kwargs.pop("source") + df = pl.read_parquet(source, **kwargs) + metadata = pl.read_parquet_metadata(source).get(SCHEMA_METADATA_KEY) + return df, metadata diff --git a/dataframely/schema.py b/dataframely/schema.py index c6cd78ab..0e86e0fe 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -22,6 +22,8 @@ from ._serialization import ( SCHEMA_METADATA_KEY, SERIALIZATION_FORMAT_VERSION, + DataFramelyIO, + ParquetIO, SchemaJSONDecoder, SchemaJSONEncoder, serialization_versions, @@ -713,10 +715,7 @@ def write_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - metadata = kwargs.pop("metadata", {}) - df.write_parquet( - file, metadata={**metadata, SCHEMA_METADATA_KEY: cls.serialize()}, **kwargs - ) + cls._write(df=df, io=ParquetIO(), file=file, **kwargs) @classmethod def sink_parquet( @@ -744,10 +743,7 @@ def sink_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - metadata = kwargs.pop("metadata", {}) - lf.sink_parquet( - file, metadata={**metadata, SCHEMA_METADATA_KEY: cls.serialize()}, **kwargs - ) + return cls._sink(lf=lf, io=ParquetIO(), file=file, **kwargs) @classmethod def read_parquet( @@ -796,9 +792,7 @@ def read_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - if not cls._requires_validation_for_reading_parquet(source, validation): - return pl.read_parquet(source, **kwargs) # type: ignore - return cls.validate(pl.read_parquet(source, **kwargs), cast=True) + return cls._read(ParquetIO(), validation=validation, source=source, **kwargs) @classmethod def scan_parquet( @@ -852,13 +846,14 @@ def scan_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - if not cls._requires_validation_for_reading_parquet(source, validation): - return pl.scan_parquet(source, **kwargs) # type: ignore - return cls.validate(pl.read_parquet(source, **kwargs), cast=True).lazy() + return cls._scan(ParquetIO(), validation=validation, source=source, **kwargs) @classmethod def _requires_validation_for_reading_parquet( - cls, source: FileSource, validation: Validation + cls, + deserialized_schema: type[Schema] | None, + validation: Validation, + source: str, ) -> bool: if validation == "skip": return False @@ -866,20 +861,16 @@ def _requires_validation_for_reading_parquet( # First, we check whether the source provides the dataframely schema. If it # does, we check whether it matches this schema. If it does, we assume that the # data adheres to the schema and we do not need to run validation. - serialized_schema = ( - read_parquet_metadata_schema(source) - if not isinstance(source, list) - else None - ) - if serialized_schema is not None: - if cls.matches(serialized_schema): + + if deserialized_schema is not None: + if cls.matches(deserialized_schema): return False # Otherwise, we definitely need to run validation. However, we emit different # information to the user depending on the value of `validate`. msg = ( "current schema does not match stored schema" - if serialized_schema is not None + if deserialized_schema is not None else "no schema to check validity can be read from the source" ) if validation == "forbid": @@ -892,6 +883,40 @@ def _requires_validation_for_reading_parquet( ) return True + # ------------------------------------- IO --------------------------------------- # + @classmethod + def _write(cls, df: pl.DataFrame, io: DataFramelyIO, **kwargs: Any) -> None: + io.write_table(df=df, serialized_schema=cls.serialize(), **kwargs) + + @classmethod + def _sink(cls, lf: pl.LazyFrame, io: DataFramelyIO, **kwargs: Any) -> None: + io.sink_table(lf=lf, serialized_schema=cls.serialize(), **kwargs) + + @classmethod + def _scan( + cls, io: DataFramelyIO, validation: Validation, **kwargs: Any + ) -> LazyFrame[Self]: + source = kwargs.pop("source") + + # Load + df, serialized_schema = io.scan_table(source=source) + deserialized_schema = ( + deserialize_schema(serialized_schema) if serialized_schema else None + ) + + # Smart validation + if cls._requires_validation_for_reading_parquet( + deserialized_schema, validation, source=str(source) + ): + return cls.validate(df, cast=True).lazy() + return cls.cast(df) + + @classmethod + def _read( + cls, io: DataFramelyIO, validation: Validation, **kwargs: Any + ) -> DataFrame[Self]: + return cls._scan(io=io, validation=validation, **kwargs).collect() + # ----------------------------- THIRD-PARTY PACKAGES ----------------------------- # @classmethod From 0e85a161187e321086336ea52a353d656ff204ef Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Wed, 6 Aug 2025 13:57:54 +0200 Subject: [PATCH 02/25] wip --- dataframely/_serialization.py | 80 ++++++++++++++++++++++++++++++---- dataframely/collection.py | 81 ++++++++++++++++++++++------------- 2 files changed, 124 insertions(+), 37 deletions(-) diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index 570ebeab..dbcf4f4b 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -6,6 +6,7 @@ from abc import ABC, abstractmethod from io import BytesIO from json import JSONDecoder, JSONEncoder +from pathlib import Path from typing import Any, cast import polars as pl @@ -115,6 +116,7 @@ def object_hook(self, dct: dict[str, Any]) -> Any: SerializedSchema = str +SerializedCollection = str class DataFramelyIO(ABC): @@ -140,14 +142,24 @@ def read_table( ) -> tuple[pl.DataFrame, SerializedSchema | None]: ... # # --------------------------- Collections ------------------------------------ - # @abstractmethod - # def sink_collection(self, dfs: dict[str, pl.LazyFrame], collection_metadata: MetaData, table_metadata: dict[str, MetaData], *args: WriteArgs.args, **kwargs: WriteArgs.kwargs) -> None: - # ... - # - # @abstractmethod - # def write_collection(self, dfs: dict[str, pl.DataFrame], collection_metadata: MetaData, table_metadata: dict[str, MetaData], *args: WriteArgs.args, **kwargs: WriteArgs.kwargs) -> None: - # ... - # + @abstractmethod + def sink_collection( + self, + dfs: dict[str, pl.LazyFrame], + serialized_collection: SerializedCollection, + serialized_schemas: dict[str, str], + **kwargs: Any, + ) -> None: ... + + @abstractmethod + def write_collection( + self, + dfs: dict[str, pl.LazyFrame], + serialized_collection: SerializedCollection, + serialized_schemas: dict[str, str], + **kwargs: Any, + ) -> None: ... + # @abstractmethod # def scan_collection(self, *args: ReadArgs.args, **kwargs: ReadArgs.kwargs) -> tuple[dict[str, pl.LazyFrame], MetaData]: # ... @@ -158,6 +170,7 @@ def read_table( class ParquetIO(DataFramelyIO): + # --------------------------- Schema ----------------------------------------------- def sink_table( self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any ) -> None: @@ -191,3 +204,54 @@ def read_table(self, **kwargs: Any) -> tuple[pl.DataFrame, SerializedSchema | No df = pl.read_parquet(source, **kwargs) metadata = pl.read_parquet_metadata(source).get(SCHEMA_METADATA_KEY) return df, metadata + + # ----------------------------- Collection ----------------------------------------- + def sink_collection( + self, + dfs: dict[str, pl.LazyFrame], + serialized_collection: SerializedCollection, + serialized_schemas: dict[str, str], + **kwargs: Any, + ) -> None: + path = Path(kwargs.pop("directory")) + + # The collection schema is serialized as part of the member parquet metadata + kwargs["metadata"] = kwargs.get("metadata", {}) | { + COLLECTION_METADATA_KEY: serialized_collection + } + + for key, lf in dfs.items(): + destination = ( + path / key if "partition_by" in kwargs else path / f"{key}.parquet" + ) + self.sink_table( + lf, + serialized_schema=serialized_schemas[key], + file=destination, + **kwargs, + ) + + def write_collection( + self, + dfs: dict[str, pl.LazyFrame], + serialized_collection: SerializedCollection, + serialized_schemas: dict[str, str], + **kwargs: Any, + ) -> None: + path = Path(kwargs.pop("directory")) + + # The collection schema is serialized as part of the member parquet metadata + kwargs["metadata"] = kwargs.get("metadata", {}) | { + COLLECTION_METADATA_KEY: serialized_collection + } + + for key, lf in dfs.items(): + destination = ( + path / key if "partition_by" in kwargs else path / f"{key}.parquet" + ) + self.sink_table( + lf, + serialized_schema=serialized_schemas[key], + file=destination, + **kwargs, + ) diff --git a/dataframely/collection.py b/dataframely/collection.py index 109488a5..fe231ae6 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -21,6 +21,8 @@ from ._serialization import ( COLLECTION_METADATA_KEY, SERIALIZATION_FORMAT_VERSION, + DataFramelyIO, + ParquetIO, SchemaJSONDecoder, SchemaJSONEncoder, serialization_versions, @@ -652,7 +654,7 @@ def write_parquet(self, directory: str | Path, **kwargs: Any) -> None: Attention: This method suffers from the same limitations as :meth:`Schema.serialize`. """ - self._to_parquet(directory, sink=False, **kwargs) + self._write(ParquetIO(), directory=directory) def sink_parquet(self, directory: str | Path, **kwargs: Any) -> None: """Stream the members of this collection into parquet files in a directory. @@ -672,34 +674,35 @@ def sink_parquet(self, directory: str | Path, **kwargs: Any) -> None: Attention: This method suffers from the same limitations as :meth:`Schema.serialize`. """ - self._to_parquet(directory, sink=True, **kwargs) - - def _to_parquet(self, directory: str | Path, *, sink: bool, **kwargs: Any) -> None: - path = Path(directory) if isinstance(directory, str) else directory - path.mkdir(parents=True, exist_ok=True) - - # The collection schema is serialized as part of the member parquet metadata - kwargs["metadata"] = kwargs.get("metadata", {}) | { - COLLECTION_METADATA_KEY: self.serialize() - } - - member_schemas = self.member_schemas() - for key, lf in self.to_dict().items(): - destination = ( - path / key if "partition_by" in kwargs else path / f"{key}.parquet" - ) - if sink: - member_schemas[key].sink_parquet( - lf, # type: ignore - destination, - **kwargs, - ) - else: - member_schemas[key].write_parquet( - lf.collect(), # type: ignore - destination, - **kwargs, - ) + self._sink(ParquetIO(), directory=directory) + + # + # def _to_parquet(self, directory: str | Path, *, sink: bool, **kwargs: Any) -> None: + # path = Path(directory) if isinstance(directory, str) else directory + # path.mkdir(parents=True, exist_ok=True) + # + # # The collection schema is serialized as part of the member parquet metadata + # kwargs["metadata"] = kwargs.get("metadata", {}) | { + # COLLECTION_METADATA_KEY: self.serialize() + # } + # + # member_schemas = self.member_schemas() + # for key, lf in self.to_dict().items(): + # destination = ( + # path / key if "partition_by" in kwargs else path / f"{key}.parquet" + # ) + # if sink: + # member_schemas[key].sink_parquet( + # lf, # type: ignore + # destination, + # **kwargs, + # ) + # else: + # member_schemas[key].write_parquet( + # lf.collect(), # type: ignore + # destination, + # **kwargs, + # ) @classmethod def read_parquet( @@ -862,6 +865,26 @@ def _from_parquet( return data, collection_type + def _write(self, io: DataFramelyIO, directory: Path | str) -> None: + io.write_collection( + self.to_dict(), + serialized_collection=self.serialize(), + serialized_schemas={ + key: schema.serialize() for key, schema in self.member_schemas().items() + }, + directory=directory, + ) + + def _sink(self, io: DataFramelyIO, directory: Path | str) -> None: + io.sink_collection( + self.to_dict(), + serialized_collection=self.serialize(), + serialized_schemas={ + key: schema.serialize() for key, schema in self.member_schemas().items() + }, + directory=directory, + ) + @classmethod def _member_source_path(cls, base_path: Path, name: str) -> Path | None: if (path := base_path / name).exists() and base_path.is_dir(): From c17bb9736148628320cee2bcd8eb08c07fa2e491 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Wed, 6 Aug 2025 14:42:51 +0200 Subject: [PATCH 03/25] wip --- dataframely/_serialization.py | 70 ++++++++++++++++++--- dataframely/collection.py | 115 +++++++++++++++++----------------- 2 files changed, 119 insertions(+), 66 deletions(-) diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index dbcf4f4b..2e511d4a 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -4,6 +4,7 @@ import datetime as dt import decimal from abc import ABC, abstractmethod +from collections.abc import Iterable from io import BytesIO from json import JSONDecoder, JSONEncoder from pathlib import Path @@ -160,13 +161,15 @@ def write_collection( **kwargs: Any, ) -> None: ... - # @abstractmethod - # def scan_collection(self, *args: ReadArgs.args, **kwargs: ReadArgs.kwargs) -> tuple[dict[str, pl.LazyFrame], MetaData]: - # ... - # - # @abstractmethod - # def read_collection(self, *args: ReadArgs.args, **kwargs: ReadArgs.kwargs) -> tuple[dict[str, pl.DataFrame], MetaData]: - # ... + @abstractmethod + def scan_collection( + self, members: Iterable[str], **kwargs: Any + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: ... + + @abstractmethod + def read_collection( + self, members: Iterable[str], **kwargs: Any + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: ... class ParquetIO(DataFramelyIO): @@ -255,3 +258,56 @@ def write_collection( file=destination, **kwargs, ) + + def scan_collection( + self, members: Iterable[str], **kwargs: Any + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + path = Path(kwargs.pop("directory")) + return self._from_parquet(path=path, members=members, scan=True, **kwargs) + + def read_collection( + self, members: Iterable[str], **kwargs: Any + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + path = Path(kwargs.pop("directory")) + return self._from_parquet(path=path, members=members, scan=False, **kwargs) + + def _from_parquet( + self, path: Path, members: Iterable[str], scan: bool, **kwargs: Any + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + data = {} + collection_types = [] + + for key in members: + if (source_path := self._member_source_path(path, key)) is not None: + data[key] = ( + pl.scan_parquet(source_path, **kwargs) + if scan + else pl.read_parquet(source_path, **kwargs).lazy() + ) + if source_path.is_file(): + collection_types.append(_read_serialized_collection(source_path)) + else: + for file in source_path.glob("**/*.parquet"): + collection_types.append(_read_serialized_collection(file)) + + # Backward compatibility: If the parquets do not have schema information, + # fall back to looking for schema.json + if (not collection_types) and (schema_file := path / "schema.json").exists(): + collection_types.append(schema_file.read_text()) + + return data, collection_types + + @classmethod + def _member_source_path(cls, base_path: Path, name: str) -> Path | None: + if (path := base_path / name).exists() and base_path.is_dir(): + # We assume that the member is stored as a hive-partitioned dataset + return path + if (path := base_path / f"{name}.parquet").exists(): + # We assume that the member is stored as a single parquet file + return path + return None + + +def _read_serialized_collection(path: Path) -> SerializedCollection | None: + meta = pl.read_parquet_metadata(path) + return meta.get(COLLECTION_METADATA_KEY) diff --git a/dataframely/collection.py b/dataframely/collection.py index fe231ae6..9170cbf6 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -759,14 +759,12 @@ def read_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - path = Path(directory) - data, collection_type = cls._from_parquet(path, scan=False, **kwargs) - if not cls._requires_validation_for_reading_parquets( - path, collection_type, validation - ): - cls._validate_input_keys(data) - return cls._init(data) - return cls.validate(data, cast=True) + return cls._read( + io=ParquetIO(), + validation=validation, + directory=directory, + **kwargs, + ) @classmethod def scan_parquet( @@ -826,44 +824,12 @@ def scan_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - path = Path(directory) - data, collection_type = cls._from_parquet(path, scan=True, **kwargs) - if not cls._requires_validation_for_reading_parquets( - path, collection_type, validation - ): - cls._validate_input_keys(data) - return cls._init(data) - return cls.validate(data, cast=True) - - @classmethod - def _from_parquet( - cls, path: Path, scan: bool, **kwargs: Any - ) -> tuple[dict[str, pl.LazyFrame], type[Collection] | None]: - data = {} - collection_types = set() - for key in cls.members(): - if (source_path := cls._member_source_path(path, key)) is not None: - data[key] = ( - pl.scan_parquet(source_path, **kwargs) - if scan - else pl.read_parquet(source_path, **kwargs).lazy() - ) - if source_path.is_file(): - collection_types.add(read_parquet_metadata_collection(source_path)) - else: - for file in source_path.glob("**/*.parquet"): - collection_types.add(read_parquet_metadata_collection(file)) - collection_type = _reconcile_collection_types(collection_types) - - # Backward compatibility: If the parquets do not have schema information, - # fall back to looking for schema.json - if (collection_type is None) and (schema_file := path / "schema.json").exists(): - try: - collection_type = deserialize_collection(schema_file.read_text()) - except JSONDecodeError: - pass - - return data, collection_type + return cls._scan( + io=ParquetIO(), + validation=validation, + directory=directory, + **kwargs, + ) def _write(self, io: DataFramelyIO, directory: Path | str) -> None: io.write_collection( @@ -886,19 +852,52 @@ def _sink(self, io: DataFramelyIO, directory: Path | str) -> None: ) @classmethod - def _member_source_path(cls, base_path: Path, name: str) -> Path | None: - if (path := base_path / name).exists() and base_path.is_dir(): - # We assume that the member is stored as a hive-partitioned dataset - return path - if (path := base_path / f"{name}.parquet").exists(): - # We assume that the member is stored as a single parquet file - return path - return None + def _scan(cls, io: DataFramelyIO, validation: Validation, **kwargs: Any) -> Self: + data, serialized_collection_types = io.read_collection( + members=cls.member_schemas().keys(), **kwargs + ) + collection_types = [] + collection_type: type[Collection] | None = None + for t in serialized_collection_types: + if t is None: + continue + try: + collection_type = deserialize_collection(t) + collection_types.append(collection_type) + except JSONDecodeError: + pass + + collection_type = _reconcile_collection_types(collection_types) + + if cls._requires_validation_for_reading_parquets(collection_type, validation): + return cls.validate(data, cast=True) + return cls.cast(data) + + @classmethod + def _read(cls, io: DataFramelyIO, validation: Validation, **kwargs: Any) -> Self: + data, serialized_collection_types = io.scan_collection( + members=cls.member_schemas().keys(), **kwargs + ) + collection_types = [] + collection_type: type[Collection] | None = None + for t in serialized_collection_types: + if t is None: + continue + try: + collection_type = deserialize_collection(t) + collection_types.append(collection_type) + except JSONDecodeError: + pass + + collection_type = _reconcile_collection_types(collection_types) + + if cls._requires_validation_for_reading_parquets(collection_type, validation): + return cls.validate(data, cast=True) + return cls.cast(data) @classmethod def _requires_validation_for_reading_parquets( cls, - directory: Path, collection_type: type[Collection] | None, validation: Validation, ) -> bool: @@ -917,12 +916,10 @@ def _requires_validation_for_reading_parquets( ) if validation == "forbid": raise ValidationRequiredError( - f"Cannot read collection from '{directory!r}' without validation: {msg}." + f"Cannot read collection without validation: {msg}." ) if validation == "warn": - warnings.warn( - f"Reading parquet file from '{directory!r}' requires validation: {msg}." - ) + warnings.warn(f"Reading parquet file requires validation: {msg}.") return True # ----------------------------------- UTILITIES ---------------------------------- # From a065dcaba535f36a2595f7e3c980ffefd915a3c4 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 11:42:12 +0200 Subject: [PATCH 04/25] fix --- dataframely/_serialization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index 2e511d4a..4d8079e9 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -292,7 +292,7 @@ def _from_parquet( # Backward compatibility: If the parquets do not have schema information, # fall back to looking for schema.json - if (not collection_types) and (schema_file := path / "schema.json").exists(): + if not any(collection_types) and (schema_file := path / "schema.json").exists(): collection_types.append(schema_file.read_text()) return data, collection_types From 0be742bd9ffc902b7d466836f124d366d0cf33ed Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 15:24:01 +0200 Subject: [PATCH 05/25] fix --- dataframely/_serialization.py | 235 +++++++++++++++++++++++++++++----- dataframely/collection.py | 48 ++----- dataframely/schema.py | 30 +++-- 3 files changed, 232 insertions(+), 81 deletions(-) diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index 4d8079e9..4400ac87 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -120,29 +120,74 @@ def object_hook(self, dct: dict[str, Any]) -> Any: SerializedCollection = str -class DataFramelyIO(ABC): - # --------------------------- Individual tables ------------------------------------ +class IOManager(ABC): + """Base class for IO managers. + + An IO manager encapsulates a way of serializing and deserializing dataframlely + data-/lazyframes and collections. This base class provides a unified interface for + all such use cases. + + The interface is designed to operate data provided as polars frames, and metadata + provided serialized strings. This design is meant to limit the coupling between the + Schema/Collection classes and specifics of how data and metadata is stored. + """ + + # ----------------------------------- Schemas ------------------------------------- @abstractmethod - def sink_table( + def sink_schema( self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any - ) -> None: ... + ) -> None: + """Stream the contents of a dataframe, and its metadata to the storage backend. + + Args: + lf: A frame containing the data to be stored. + serialized_schema: String-serialized schema information. + kwargs: Additional keyword arguments to pass to the underlying storage + implementation. + """ @abstractmethod - def write_table( + def write_schema( self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any - ) -> None: ... + ) -> None: + """Write the contents of a dataframe, and its metadata to the storage backend. + + Args: + df: A dataframe containing the data to be stored. + frame: String-serialized schema information. + kwargs: Additional keyword arguments to pass to the underlying storage + implementation. + """ @abstractmethod - def scan_table( + def scan_schema( self, **kwargs: Any - ) -> tuple[pl.LazyFrame, SerializedSchema | None]: ... + ) -> tuple[pl.LazyFrame, SerializedSchema | None]: + """Lazily read frame data and metadata from the storage backend. + + Args: + kwargs: Keyword arguments to pass to the underlying storage. + Refer to the individual implementation to see which keywords + are available. + Returns: + A tuple of the lazy frame data and metadata if available. + """ @abstractmethod - def read_table( + def read_schema( self, **kwargs: Any - ) -> tuple[pl.DataFrame, SerializedSchema | None]: ... - - # # --------------------------- Collections ------------------------------------ + ) -> tuple[pl.DataFrame, SerializedSchema | None]: + """Eagerly read frame data and metadata from the storage backend. + + Args: + kwargs: Keyword arguments to pass to the underlying storage. + Refer to the individual implementation to see which keywords + are available. + Returns: + A tuple of the lazy frame data and metadata if available. + """ + + # ------------------------------ Collections --------------------------------------- @abstractmethod def sink_collection( self, @@ -150,7 +195,17 @@ def sink_collection( serialized_collection: SerializedCollection, serialized_schemas: dict[str, str], **kwargs: Any, - ) -> None: ... + ) -> None: + """Stream the members of this collection into the storage backend. + + Args: + dfs: Dictionary containing the data to be stored. + serialized_collection: String-serialized information about the origin Collection. + serialized_schemas: String-serialized information about the individual Schemas + for each of the member frames. This information is also logically included + in the collection metadata, but it is passed separately here to ensure that + each member can also be read back as an individual frame. + """ @abstractmethod def write_collection( @@ -159,24 +214,78 @@ def write_collection( serialized_collection: SerializedCollection, serialized_schemas: dict[str, str], **kwargs: Any, - ) -> None: ... + ) -> None: + """Write the members of this collection into the storage backend. + + Args: + dfs: Dictionary containing the data to be stored. + serialized_collection: String-serialized information about the origin Collection. + serialized_schemas: String-serialized information about the individual Schemas + for each of the member frames. This information is also logically included + in the collection metadata, but it is passed separately here to ensure that + each member can also be read back as an individual frame. + """ @abstractmethod def scan_collection( self, members: Iterable[str], **kwargs: Any - ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: ... + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + """Lazily read all collection members from the storage backend. + + Args: + members: Collection member names to read. + kwargs: Additional keyword arguments to pass to the underlying storage. + Refer to the individual implementation to see which keywords are available. + Returns: + A tuple of the collection data and metadata if available. + Depending on the storage implementation, multiple copies of the metadata + may be available, which are returned as a list. + It is up to the caller to decide how to handle the presence/absence/consistency + of the returned values. + """ @abstractmethod def read_collection( self, members: Iterable[str], **kwargs: Any - ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: ... + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + """Lazily read all collection members from the storage backend. + Args: + members: Collection member names to read. + kwargs: Additional keyword arguments to pass to the underlying storage. + Refer to the individual implementation to see which keywords are available. + Returns: + A tuple of the collection data and metadata if available. + Depending on the storage implementation, multiple copies of the metadata + may be available, which are returned as a list. + It is up to the caller to decide how to handle the presence/absence/consistency + of the returned values. + """ -class ParquetIO(DataFramelyIO): - # --------------------------- Schema ----------------------------------------------- - def sink_table( + +class ParquetIOManager(IOManager): + """IO manager that stores data and metadata in parquet files on a file system. + + Single frames are stored as individual parquet files + + Collections are stored as directories. + """ + + # ----------------------------------- Schemas ------------------------------------- + def sink_schema( self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any ) -> None: + """This method stores frames as individual parquet files. + + Args: + lf: LazyFrame containing the data to be stored. + kwargs: The "file" kwarg is required to specify where data is stored. + It should point to a parquet file. If hive partitioning is used, + it should point to a directory. + The "metadata" kwarg is supported to pass a dictionary of parquet + metadata. + Additional keyword arguments are passed to polars. + """ file = kwargs.pop("file") metadata = kwargs.pop("metadata", {}) lf.sink_parquet( @@ -185,9 +294,20 @@ def sink_table( **kwargs, ) - def write_table( + def write_schema( self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any ) -> None: + """This method stores frames as individual parquet files. + + Args: + df: DataFrame containing the data to be stored. + kwargs: The "file" kwarg is required to specify where data is stored. + It should point to a parquet file. If hive partitioning is used, + it should point to a directory. + The "metadata" kwarg is supported to pass a dictionary of parquet + metadata. + Additional keyword arguments are passed to polars. + """ file = kwargs.pop("file") metadata = kwargs.pop("metadata", {}) df.write_parquet( @@ -196,19 +316,35 @@ def write_table( **kwargs, ) - def scan_table(self, **kwargs: Any) -> tuple[pl.LazyFrame, SerializedSchema | None]: + def scan_schema( + self, **kwargs: Any + ) -> tuple[pl.LazyFrame, SerializedSchema | None]: + """Lazily read single frames from parquet. + + Args: + kwargs: The "source" kwarg is required to specify where data is stored. + Other kwargs are passed to polars. + """ source = kwargs.pop("source") lf = pl.scan_parquet(source, **kwargs) - metadata = pl.read_parquet_metadata(source).get(SCHEMA_METADATA_KEY) + metadata = _read_serialized_schema(source) return lf, metadata - def read_table(self, **kwargs: Any) -> tuple[pl.DataFrame, SerializedSchema | None]: + def read_schema( + self, **kwargs: Any + ) -> tuple[pl.DataFrame, SerializedSchema | None]: + """Eagerly read single frames from parquet. + + Args: + kwargs: The "source" kwarg is required to specify where data is stored. + Other kwargs are passed to polars. + """ source = kwargs.pop("source") df = pl.read_parquet(source, **kwargs) - metadata = pl.read_parquet_metadata(source).get(SCHEMA_METADATA_KEY) + metadata = _read_serialized_schema(source) return df, metadata - # ----------------------------- Collection ----------------------------------------- + # ------------------------------ Collections --------------------------------------- def sink_collection( self, dfs: dict[str, pl.LazyFrame], @@ -216,6 +352,14 @@ def sink_collection( serialized_schemas: dict[str, str], **kwargs: Any, ) -> None: + """Stream multiple frames to parquet. + + Args: + dfs: See base class. + serialized_collection: See base class. + serialized_schemas: See base class. + kwargs: The "directory" kwarg is required to specify where data is stored. + """ path = Path(kwargs.pop("directory")) # The collection schema is serialized as part of the member parquet metadata @@ -227,7 +371,7 @@ def sink_collection( destination = ( path / key if "partition_by" in kwargs else path / f"{key}.parquet" ) - self.sink_table( + self.sink_schema( lf, serialized_schema=serialized_schemas[key], file=destination, @@ -241,6 +385,14 @@ def write_collection( serialized_schemas: dict[str, str], **kwargs: Any, ) -> None: + """Write multiple frames to parquet. + + Args: + dfs: See base class. + serialized_collection: See base class. + serialized_schemas: See base class. + kwargs: The "directory" kwarg is required to specify where data is stored. + """ path = Path(kwargs.pop("directory")) # The collection schema is serialized as part of the member parquet metadata @@ -252,7 +404,7 @@ def write_collection( destination = ( path / key if "partition_by" in kwargs else path / f"{key}.parquet" ) - self.sink_table( + self.sink_schema( lf, serialized_schema=serialized_schemas[key], file=destination, @@ -262,18 +414,36 @@ def write_collection( def scan_collection( self, members: Iterable[str], **kwargs: Any ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + """Lazily read multiple frames from parquet. + + Args: + members: See base class. + kwargs: The "directory" kwarg is required to specify where data is stored. + """ path = Path(kwargs.pop("directory")) - return self._from_parquet(path=path, members=members, scan=True, **kwargs) + return self._collection_from_parquet( + path=path, members=members, scan=True, **kwargs + ) def read_collection( self, members: Iterable[str], **kwargs: Any ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + """Eagerly read multiple frames from parquet. + + Args: + members: See base class. + kwargs: The "directory" kwarg is required to specify where data is stored. + """ path = Path(kwargs.pop("directory")) - return self._from_parquet(path=path, members=members, scan=False, **kwargs) + return self._collection_from_parquet( + path=path, members=members, scan=False, **kwargs + ) - def _from_parquet( + def _collection_from_parquet( self, path: Path, members: Iterable[str], scan: bool, **kwargs: Any ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + # Utility method encapsulating the logic that is common + # between lazy and eager reads data = {} collection_types = [] @@ -311,3 +481,8 @@ def _member_source_path(cls, base_path: Path, name: str) -> Path | None: def _read_serialized_collection(path: Path) -> SerializedCollection | None: meta = pl.read_parquet_metadata(path) return meta.get(COLLECTION_METADATA_KEY) + + +def _read_serialized_schema(path: Path) -> SerializedSchema | None: + meta = pl.read_parquet_metadata(path) + return meta.get(SCHEMA_METADATA_KEY) diff --git a/dataframely/collection.py b/dataframely/collection.py index 9170cbf6..a97e13d7 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -21,8 +21,8 @@ from ._serialization import ( COLLECTION_METADATA_KEY, SERIALIZATION_FORMAT_VERSION, - DataFramelyIO, - ParquetIO, + IOManager, + ParquetIOManager, SchemaJSONDecoder, SchemaJSONEncoder, serialization_versions, @@ -654,7 +654,7 @@ def write_parquet(self, directory: str | Path, **kwargs: Any) -> None: Attention: This method suffers from the same limitations as :meth:`Schema.serialize`. """ - self._write(ParquetIO(), directory=directory) + self._write(ParquetIOManager(), directory=directory) def sink_parquet(self, directory: str | Path, **kwargs: Any) -> None: """Stream the members of this collection into parquet files in a directory. @@ -674,35 +674,7 @@ def sink_parquet(self, directory: str | Path, **kwargs: Any) -> None: Attention: This method suffers from the same limitations as :meth:`Schema.serialize`. """ - self._sink(ParquetIO(), directory=directory) - - # - # def _to_parquet(self, directory: str | Path, *, sink: bool, **kwargs: Any) -> None: - # path = Path(directory) if isinstance(directory, str) else directory - # path.mkdir(parents=True, exist_ok=True) - # - # # The collection schema is serialized as part of the member parquet metadata - # kwargs["metadata"] = kwargs.get("metadata", {}) | { - # COLLECTION_METADATA_KEY: self.serialize() - # } - # - # member_schemas = self.member_schemas() - # for key, lf in self.to_dict().items(): - # destination = ( - # path / key if "partition_by" in kwargs else path / f"{key}.parquet" - # ) - # if sink: - # member_schemas[key].sink_parquet( - # lf, # type: ignore - # destination, - # **kwargs, - # ) - # else: - # member_schemas[key].write_parquet( - # lf.collect(), # type: ignore - # destination, - # **kwargs, - # ) + self._sink(ParquetIOManager(), directory=directory) @classmethod def read_parquet( @@ -760,7 +732,7 @@ def read_parquet( :meth:`serialize`. """ return cls._read( - io=ParquetIO(), + io=ParquetIOManager(), validation=validation, directory=directory, **kwargs, @@ -825,13 +797,13 @@ def scan_parquet( :meth:`serialize`. """ return cls._scan( - io=ParquetIO(), + io=ParquetIOManager(), validation=validation, directory=directory, **kwargs, ) - def _write(self, io: DataFramelyIO, directory: Path | str) -> None: + def _write(self, io: IOManager, directory: Path | str) -> None: io.write_collection( self.to_dict(), serialized_collection=self.serialize(), @@ -841,7 +813,7 @@ def _write(self, io: DataFramelyIO, directory: Path | str) -> None: directory=directory, ) - def _sink(self, io: DataFramelyIO, directory: Path | str) -> None: + def _sink(self, io: IOManager, directory: Path | str) -> None: io.sink_collection( self.to_dict(), serialized_collection=self.serialize(), @@ -852,7 +824,7 @@ def _sink(self, io: DataFramelyIO, directory: Path | str) -> None: ) @classmethod - def _scan(cls, io: DataFramelyIO, validation: Validation, **kwargs: Any) -> Self: + def _scan(cls, io: IOManager, validation: Validation, **kwargs: Any) -> Self: data, serialized_collection_types = io.read_collection( members=cls.member_schemas().keys(), **kwargs ) @@ -874,7 +846,7 @@ def _scan(cls, io: DataFramelyIO, validation: Validation, **kwargs: Any) -> Self return cls.cast(data) @classmethod - def _read(cls, io: DataFramelyIO, validation: Validation, **kwargs: Any) -> Self: + def _read(cls, io: IOManager, validation: Validation, **kwargs: Any) -> Self: data, serialized_collection_types = io.scan_collection( members=cls.member_schemas().keys(), **kwargs ) diff --git a/dataframely/schema.py b/dataframely/schema.py index 0e86e0fe..62b97766 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -22,8 +22,8 @@ from ._serialization import ( SCHEMA_METADATA_KEY, SERIALIZATION_FORMAT_VERSION, - DataFramelyIO, - ParquetIO, + IOManager, + ParquetIOManager, SchemaJSONDecoder, SchemaJSONEncoder, serialization_versions, @@ -715,7 +715,7 @@ def write_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - cls._write(df=df, io=ParquetIO(), file=file, **kwargs) + cls._write(df=df, io=ParquetIOManager(), file=file, **kwargs) @classmethod def sink_parquet( @@ -743,7 +743,7 @@ def sink_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - return cls._sink(lf=lf, io=ParquetIO(), file=file, **kwargs) + return cls._sink(lf=lf, io=ParquetIOManager(), file=file, **kwargs) @classmethod def read_parquet( @@ -792,7 +792,9 @@ def read_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - return cls._read(ParquetIO(), validation=validation, source=source, **kwargs) + return cls._read( + ParquetIOManager(), validation=validation, source=source, **kwargs + ) @classmethod def scan_parquet( @@ -846,7 +848,9 @@ def scan_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - return cls._scan(ParquetIO(), validation=validation, source=source, **kwargs) + return cls._scan( + ParquetIOManager(), validation=validation, source=source, **kwargs + ) @classmethod def _requires_validation_for_reading_parquet( @@ -885,21 +889,21 @@ def _requires_validation_for_reading_parquet( # ------------------------------------- IO --------------------------------------- # @classmethod - def _write(cls, df: pl.DataFrame, io: DataFramelyIO, **kwargs: Any) -> None: - io.write_table(df=df, serialized_schema=cls.serialize(), **kwargs) + def _write(cls, df: pl.DataFrame, io: IOManager, **kwargs: Any) -> None: + io.write_schema(df=df, serialized_schema=cls.serialize(), **kwargs) @classmethod - def _sink(cls, lf: pl.LazyFrame, io: DataFramelyIO, **kwargs: Any) -> None: - io.sink_table(lf=lf, serialized_schema=cls.serialize(), **kwargs) + def _sink(cls, lf: pl.LazyFrame, io: IOManager, **kwargs: Any) -> None: + io.sink_schema(lf=lf, serialized_schema=cls.serialize(), **kwargs) @classmethod def _scan( - cls, io: DataFramelyIO, validation: Validation, **kwargs: Any + cls, io: IOManager, validation: Validation, **kwargs: Any ) -> LazyFrame[Self]: source = kwargs.pop("source") # Load - df, serialized_schema = io.scan_table(source=source) + df, serialized_schema = io.scan_schema(source=source) deserialized_schema = ( deserialize_schema(serialized_schema) if serialized_schema else None ) @@ -913,7 +917,7 @@ def _scan( @classmethod def _read( - cls, io: DataFramelyIO, validation: Validation, **kwargs: Any + cls, io: IOManager, validation: Validation, **kwargs: Any ) -> DataFrame[Self]: return cls._scan(io=io, validation=validation, **kwargs).collect() From c79fadc92d9f191cb0ba33c2117abf9a0196325d Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 15:25:46 +0200 Subject: [PATCH 06/25] fix --- dataframely/_serialization.py | 28 ++++++++++------------------ dataframely/collection.py | 8 ++++++++ dataframely/schema.py | 6 +++--- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index 4400ac87..f0c89ef9 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -134,7 +134,7 @@ class IOManager(ABC): # ----------------------------------- Schemas ------------------------------------- @abstractmethod - def sink_schema( + def sink_frame( self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any ) -> None: """Stream the contents of a dataframe, and its metadata to the storage backend. @@ -147,7 +147,7 @@ def sink_schema( """ @abstractmethod - def write_schema( + def write_frame( self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any ) -> None: """Write the contents of a dataframe, and its metadata to the storage backend. @@ -160,9 +160,7 @@ def write_schema( """ @abstractmethod - def scan_schema( - self, **kwargs: Any - ) -> tuple[pl.LazyFrame, SerializedSchema | None]: + def scan_frame(self, **kwargs: Any) -> tuple[pl.LazyFrame, SerializedSchema | None]: """Lazily read frame data and metadata from the storage backend. Args: @@ -174,9 +172,7 @@ def scan_schema( """ @abstractmethod - def read_schema( - self, **kwargs: Any - ) -> tuple[pl.DataFrame, SerializedSchema | None]: + def read_frame(self, **kwargs: Any) -> tuple[pl.DataFrame, SerializedSchema | None]: """Eagerly read frame data and metadata from the storage backend. Args: @@ -272,7 +268,7 @@ class ParquetIOManager(IOManager): """ # ----------------------------------- Schemas ------------------------------------- - def sink_schema( + def sink_frame( self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any ) -> None: """This method stores frames as individual parquet files. @@ -294,7 +290,7 @@ def sink_schema( **kwargs, ) - def write_schema( + def write_frame( self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any ) -> None: """This method stores frames as individual parquet files. @@ -316,9 +312,7 @@ def write_schema( **kwargs, ) - def scan_schema( - self, **kwargs: Any - ) -> tuple[pl.LazyFrame, SerializedSchema | None]: + def scan_frame(self, **kwargs: Any) -> tuple[pl.LazyFrame, SerializedSchema | None]: """Lazily read single frames from parquet. Args: @@ -330,9 +324,7 @@ def scan_schema( metadata = _read_serialized_schema(source) return lf, metadata - def read_schema( - self, **kwargs: Any - ) -> tuple[pl.DataFrame, SerializedSchema | None]: + def read_frame(self, **kwargs: Any) -> tuple[pl.DataFrame, SerializedSchema | None]: """Eagerly read single frames from parquet. Args: @@ -371,7 +363,7 @@ def sink_collection( destination = ( path / key if "partition_by" in kwargs else path / f"{key}.parquet" ) - self.sink_schema( + self.sink_frame( lf, serialized_schema=serialized_schemas[key], file=destination, @@ -404,7 +396,7 @@ def write_collection( destination = ( path / key if "partition_by" in kwargs else path / f"{key}.parquet" ) - self.sink_schema( + self.sink_frame( lf, serialized_schema=serialized_schemas[key], file=destination, diff --git a/dataframely/collection.py b/dataframely/collection.py index a97e13d7..f6202c04 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -804,6 +804,8 @@ def scan_parquet( ) def _write(self, io: IOManager, directory: Path | str) -> None: + # Utility method encapsulating the interaction with the IOManager + io.write_collection( self.to_dict(), serialized_collection=self.serialize(), @@ -814,6 +816,8 @@ def _write(self, io: IOManager, directory: Path | str) -> None: ) def _sink(self, io: IOManager, directory: Path | str) -> None: + # Utility method encapsulating the interaction with the IOManager + io.sink_collection( self.to_dict(), serialized_collection=self.serialize(), @@ -825,6 +829,8 @@ def _sink(self, io: IOManager, directory: Path | str) -> None: @classmethod def _scan(cls, io: IOManager, validation: Validation, **kwargs: Any) -> Self: + # Utility method encapsulating the interaction with the IOManager + data, serialized_collection_types = io.read_collection( members=cls.member_schemas().keys(), **kwargs ) @@ -847,6 +853,8 @@ def _scan(cls, io: IOManager, validation: Validation, **kwargs: Any) -> Self: @classmethod def _read(cls, io: IOManager, validation: Validation, **kwargs: Any) -> Self: + # Utility method encapsulating the interaction with the IOManager + data, serialized_collection_types = io.scan_collection( members=cls.member_schemas().keys(), **kwargs ) diff --git a/dataframely/schema.py b/dataframely/schema.py index 62b97766..c16d51e8 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -890,11 +890,11 @@ def _requires_validation_for_reading_parquet( # ------------------------------------- IO --------------------------------------- # @classmethod def _write(cls, df: pl.DataFrame, io: IOManager, **kwargs: Any) -> None: - io.write_schema(df=df, serialized_schema=cls.serialize(), **kwargs) + io.write_frame(df=df, serialized_schema=cls.serialize(), **kwargs) @classmethod def _sink(cls, lf: pl.LazyFrame, io: IOManager, **kwargs: Any) -> None: - io.sink_schema(lf=lf, serialized_schema=cls.serialize(), **kwargs) + io.sink_frame(lf=lf, serialized_schema=cls.serialize(), **kwargs) @classmethod def _scan( @@ -903,7 +903,7 @@ def _scan( source = kwargs.pop("source") # Load - df, serialized_schema = io.scan_schema(source=source) + df, serialized_schema = io.scan_frame(source=source) deserialized_schema = ( deserialize_schema(serialized_schema) if serialized_schema else None ) From 5cfca181ab996cd33d322d5fdd6a44031227d39a Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 15:34:41 +0200 Subject: [PATCH 07/25] fix --- dataframely/collection.py | 63 ++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 37 deletions(-) diff --git a/dataframely/collection.py b/dataframely/collection.py index f6202c04..21b06861 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -735,6 +735,7 @@ def read_parquet( io=ParquetIOManager(), validation=validation, directory=directory, + lazy=False, **kwargs, ) @@ -796,10 +797,11 @@ def scan_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - return cls._scan( + return cls._read( io=ParquetIOManager(), validation=validation, directory=directory, + lazy=True, **kwargs, ) @@ -828,47 +830,17 @@ def _sink(self, io: IOManager, directory: Path | str) -> None: ) @classmethod - def _scan(cls, io: IOManager, validation: Validation, **kwargs: Any) -> Self: - # Utility method encapsulating the interaction with the IOManager - - data, serialized_collection_types = io.read_collection( - members=cls.member_schemas().keys(), **kwargs - ) - collection_types = [] - collection_type: type[Collection] | None = None - for t in serialized_collection_types: - if t is None: - continue - try: - collection_type = deserialize_collection(t) - collection_types.append(collection_type) - except JSONDecodeError: - pass - - collection_type = _reconcile_collection_types(collection_types) - - if cls._requires_validation_for_reading_parquets(collection_type, validation): - return cls.validate(data, cast=True) - return cls.cast(data) - - @classmethod - def _read(cls, io: IOManager, validation: Validation, **kwargs: Any) -> Self: + def _read( + cls, io: IOManager, validation: Validation, lazy: bool, **kwargs: Any + ) -> Self: # Utility method encapsulating the interaction with the IOManager - data, serialized_collection_types = io.scan_collection( + scan_function = io.scan_collection if lazy else io.read_collection + data, serialized_collection_types = scan_function( members=cls.member_schemas().keys(), **kwargs ) - collection_types = [] - collection_type: type[Collection] | None = None - for t in serialized_collection_types: - if t is None: - continue - try: - collection_type = deserialize_collection(t) - collection_types.append(collection_type) - except JSONDecodeError: - pass + collection_types = _deserialize_types(serialized_collection_types) collection_type = _reconcile_collection_types(collection_types) if cls._requires_validation_for_reading_parquets(collection_type, validation): @@ -1010,6 +982,23 @@ def _extract_keys_if_exist( return {key: data[key] for key in keys if key in data} +def _deserialize_types( + serialized_collection_types: Iterable[str | None], +) -> list[type[Collection]]: + collection_types = [] + collection_type: type[Collection] | None = None + for t in serialized_collection_types: + if t is None: + continue + try: + collection_type = deserialize_collection(t) + collection_types.append(collection_type) + except JSONDecodeError: + pass + + return collection_types + + def _reconcile_collection_types( collection_types: Iterable[type[Collection] | None], ) -> type[Collection] | None: From 85b569719ab198e2481b9fdcf4fef8bf2899501f Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 15:40:51 +0200 Subject: [PATCH 08/25] wip --- dataframely/_serialization.py | 4 ++-- dataframely/collection.py | 24 ++++++++++++------------ dataframely/schema.py | 20 ++++++++++---------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index f14c7907..e3d68886 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -126,7 +126,7 @@ def object_hook(self, dct: dict[str, Any]) -> Any: SerializedCollection = str -class IOManager(ABC): +class StorageBackend(ABC): """Base class for IO managers. An IO manager encapsulates a way of serializing and deserializing dataframlely @@ -265,7 +265,7 @@ def read_collection( """ -class ParquetIOManager(IOManager): +class ParquetStorageBackend(StorageBackend): """IO manager that stores data and metadata in parquet files on a file system. Single frames are stored as individual parquet files diff --git a/dataframely/collection.py b/dataframely/collection.py index cf53f2ee..ba2a23f0 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -21,10 +21,10 @@ from ._serialization import ( COLLECTION_METADATA_KEY, SERIALIZATION_FORMAT_VERSION, - IOManager, - ParquetIOManager, + ParquetStorageBackend, SchemaJSONDecoder, SchemaJSONEncoder, + StorageBackend, serialization_versions, ) from ._typing import LazyFrame, Validation @@ -667,7 +667,7 @@ def write_parquet(self, directory: str | Path, **kwargs: Any) -> None: Attention: This method suffers from the same limitations as :meth:`Schema.serialize`. """ - self._write(ParquetIOManager(), directory=directory) + self._write(ParquetStorageBackend(), directory=directory) def sink_parquet(self, directory: str | Path, **kwargs: Any) -> None: """Stream the members of this collection into parquet files in a directory. @@ -687,7 +687,7 @@ def sink_parquet(self, directory: str | Path, **kwargs: Any) -> None: Attention: This method suffers from the same limitations as :meth:`Schema.serialize`. """ - self._sink(ParquetIOManager(), directory=directory) + self._sink(ParquetStorageBackend(), directory=directory) @classmethod def read_parquet( @@ -745,7 +745,7 @@ def read_parquet( :meth:`serialize`. """ return cls._read( - io=ParquetIOManager(), + io=ParquetStorageBackend(), validation=validation, directory=directory, lazy=False, @@ -811,15 +811,15 @@ def scan_parquet( :meth:`serialize`. """ return cls._read( - io=ParquetIOManager(), + io=ParquetStorageBackend(), validation=validation, directory=directory, lazy=True, **kwargs, ) - def _write(self, io: IOManager, directory: Path | str) -> None: - # Utility method encapsulating the interaction with the IOManager + def _write(self, io: StorageBackend, directory: Path | str) -> None: + # Utility method encapsulating the interaction with the StorageBackend io.write_collection( self.to_dict(), @@ -830,8 +830,8 @@ def _write(self, io: IOManager, directory: Path | str) -> None: directory=directory, ) - def _sink(self, io: IOManager, directory: Path | str) -> None: - # Utility method encapsulating the interaction with the IOManager + def _sink(self, io: StorageBackend, directory: Path | str) -> None: + # Utility method encapsulating the interaction with the StorageBackend io.sink_collection( self.to_dict(), @@ -844,9 +844,9 @@ def _sink(self, io: IOManager, directory: Path | str) -> None: @classmethod def _read( - cls, io: IOManager, validation: Validation, lazy: bool, **kwargs: Any + cls, io: StorageBackend, validation: Validation, lazy: bool, **kwargs: Any ) -> Self: - # Utility method encapsulating the interaction with the IOManager + # Utility method encapsulating the interaction with the StorageBackend scan_function = io.scan_collection if lazy else io.read_collection data, serialized_collection_types = scan_function( diff --git a/dataframely/schema.py b/dataframely/schema.py index 501900c1..cc6cbaea 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -23,10 +23,10 @@ from ._serialization import ( SCHEMA_METADATA_KEY, SERIALIZATION_FORMAT_VERSION, - IOManager, - ParquetIOManager, + ParquetStorageBackend, SchemaJSONDecoder, SchemaJSONEncoder, + StorageBackend, serialization_versions, ) from ._typing import DataFrame, LazyFrame, Validation @@ -716,7 +716,7 @@ def write_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - cls._write(df=df, io=ParquetIOManager(), file=file, **kwargs) + cls._write(df=df, io=ParquetStorageBackend(), file=file, **kwargs) @classmethod def sink_parquet( @@ -744,7 +744,7 @@ def sink_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - return cls._sink(lf=lf, io=ParquetIOManager(), file=file, **kwargs) + return cls._sink(lf=lf, io=ParquetStorageBackend(), file=file, **kwargs) @classmethod def read_parquet( @@ -794,7 +794,7 @@ def read_parquet( :meth:`serialize`. """ return cls._read( - ParquetIOManager(), validation=validation, source=source, **kwargs + ParquetStorageBackend(), validation=validation, source=source, **kwargs ) @classmethod @@ -850,7 +850,7 @@ def scan_parquet( :meth:`serialize`. """ return cls._scan( - ParquetIOManager(), validation=validation, source=source, **kwargs + ParquetStorageBackend(), validation=validation, source=source, **kwargs ) @classmethod @@ -890,16 +890,16 @@ def _requires_validation_for_reading_parquet( # ------------------------------------- IO --------------------------------------- # @classmethod - def _write(cls, df: pl.DataFrame, io: IOManager, **kwargs: Any) -> None: + def _write(cls, df: pl.DataFrame, io: StorageBackend, **kwargs: Any) -> None: io.write_frame(df=df, serialized_schema=cls.serialize(), **kwargs) @classmethod - def _sink(cls, lf: pl.LazyFrame, io: IOManager, **kwargs: Any) -> None: + def _sink(cls, lf: pl.LazyFrame, io: StorageBackend, **kwargs: Any) -> None: io.sink_frame(lf=lf, serialized_schema=cls.serialize(), **kwargs) @classmethod def _scan( - cls, io: IOManager, validation: Validation, **kwargs: Any + cls, io: StorageBackend, validation: Validation, **kwargs: Any ) -> LazyFrame[Self]: source = kwargs.pop("source") @@ -918,7 +918,7 @@ def _scan( @classmethod def _read( - cls, io: IOManager, validation: Validation, **kwargs: Any + cls, io: StorageBackend, validation: Validation, **kwargs: Any ) -> DataFrame[Self]: return cls._scan(io=io, validation=validation, **kwargs).collect() From 4fe0c12d651b076fea2ec54c718a55bf65b5285e Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 15:41:13 +0200 Subject: [PATCH 09/25] wip --- dataframely/_serialization.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index e3d68886..2779da13 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -1,5 +1,6 @@ # Copyright (c) QuantCo 2025-2025 # SPDX-License-Identifier: BSD-3-Clause + import base64 import datetime as dt import decimal From 637d9f92dffb175f9de4012f29f3d9a128ad3739 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 16:35:19 +0200 Subject: [PATCH 10/25] wip --- dataframely/_serialization.py | 4 ++-- dataframely/collection.py | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index 2779da13..16aa15c2 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -403,8 +403,8 @@ def write_collection( destination = ( path / key if "partition_by" in kwargs else path / f"{key}.parquet" ) - self.sink_frame( - lf, + self.write_frame( + lf.collect(), serialized_schema=serialized_schemas[key], file=destination, **kwargs, diff --git a/dataframely/collection.py b/dataframely/collection.py index ba2a23f0..762eb538 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -667,7 +667,7 @@ def write_parquet(self, directory: str | Path, **kwargs: Any) -> None: Attention: This method suffers from the same limitations as :meth:`Schema.serialize`. """ - self._write(ParquetStorageBackend(), directory=directory) + self._write(ParquetStorageBackend(), directory=directory, **kwargs) def sink_parquet(self, directory: str | Path, **kwargs: Any) -> None: """Stream the members of this collection into parquet files in a directory. @@ -687,7 +687,7 @@ def sink_parquet(self, directory: str | Path, **kwargs: Any) -> None: Attention: This method suffers from the same limitations as :meth:`Schema.serialize`. """ - self._sink(ParquetStorageBackend(), directory=directory) + self._sink(ParquetStorageBackend(), directory=directory, **kwargs) @classmethod def read_parquet( @@ -818,7 +818,7 @@ def scan_parquet( **kwargs, ) - def _write(self, io: StorageBackend, directory: Path | str) -> None: + def _write(self, io: StorageBackend, directory: Path | str, **kwargs: Any) -> None: # Utility method encapsulating the interaction with the StorageBackend io.write_collection( @@ -828,9 +828,10 @@ def _write(self, io: StorageBackend, directory: Path | str) -> None: key: schema.serialize() for key, schema in self.member_schemas().items() }, directory=directory, + **kwargs, ) - def _sink(self, io: StorageBackend, directory: Path | str) -> None: + def _sink(self, io: StorageBackend, directory: Path | str, **kwargs: Any) -> None: # Utility method encapsulating the interaction with the StorageBackend io.sink_collection( @@ -840,6 +841,7 @@ def _sink(self, io: StorageBackend, directory: Path | str) -> None: key: schema.serialize() for key, schema in self.member_schemas().items() }, directory=directory, + **kwargs, ) @classmethod From f587f650b3a927e22643b7adfca0fe5c3f99c357 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 16:52:46 +0200 Subject: [PATCH 11/25] wip --- dataframely/schema.py | 50 ++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/dataframely/schema.py b/dataframely/schema.py index cc6cbaea..762e1ff7 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -794,7 +794,11 @@ def read_parquet( :meth:`serialize`. """ return cls._read( - ParquetStorageBackend(), validation=validation, source=source, **kwargs + ParquetStorageBackend(), + validation=validation, + lazy=False, + source=source, + **kwargs, ) @classmethod @@ -849,8 +853,12 @@ def scan_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - return cls._scan( - ParquetStorageBackend(), validation=validation, source=source, **kwargs + return cls._read( + ParquetStorageBackend(), + validation=validation, + lazy=True, + source=source, + **kwargs, ) @classmethod @@ -897,14 +905,36 @@ def _write(cls, df: pl.DataFrame, io: StorageBackend, **kwargs: Any) -> None: def _sink(cls, lf: pl.LazyFrame, io: StorageBackend, **kwargs: Any) -> None: io.sink_frame(lf=lf, serialized_schema=cls.serialize(), **kwargs) + @overload @classmethod - def _scan( - cls, io: StorageBackend, validation: Validation, **kwargs: Any - ) -> LazyFrame[Self]: + def _read( + cls, + io: StorageBackend, + validation: Validation, + lazy: Literal[True], + **kwargs: Any, + ) -> LazyFrame[Self]: ... + + @overload + @classmethod + def _read( + cls, + io: StorageBackend, + validation: Validation, + lazy: Literal[False], + **kwargs: Any, + ) -> DataFrame[Self]: ... + + @classmethod + def _read( + cls, io: StorageBackend, validation: Validation, lazy: bool, **kwargs: Any + ) -> LazyFrame[Self] | DataFrame[Self]: + read_function = io.scan_frame if lazy else io.read_frame + source = kwargs.pop("source") # Load - df, serialized_schema = io.scan_frame(source=source) + df, serialized_schema = read_function(source=source) deserialized_schema = ( deserialize_schema(serialized_schema) if serialized_schema else None ) @@ -916,12 +946,6 @@ def _scan( return cls.validate(df, cast=True).lazy() return cls.cast(df) - @classmethod - def _read( - cls, io: StorageBackend, validation: Validation, **kwargs: Any - ) -> DataFrame[Self]: - return cls._scan(io=io, validation=validation, **kwargs).collect() - # ----------------------------- THIRD-PARTY PACKAGES ----------------------------- # @classmethod From 989f8773db56f78f0c21218f0a0b549eed00d700 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 16:54:52 +0200 Subject: [PATCH 12/25] wip --- dataframely/schema.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dataframely/schema.py b/dataframely/schema.py index 762e1ff7..8d395b7f 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -1027,11 +1027,15 @@ def read_parquet_metadata_schema( The schema that was serialized to the metadata. ``None`` if no schema metadata is found or the deserialization fails. """ - metadata = pl.read_parquet_metadata(source) + try: + metadata = pl.read_parquet_metadata(source) + except plexc.ComputeError: + return None + if (schema_metadata := metadata.get(SCHEMA_METADATA_KEY)) is not None: try: return deserialize_schema(schema_metadata) - except (JSONDecodeError, plexc.ComputeError): + except JSONDecodeError: return None return None From 96e1e22d07ed4e3b1f01861adc2ff4f2d3509a90 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 17:06:49 +0200 Subject: [PATCH 13/25] fi --- dataframely/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataframely/schema.py b/dataframely/schema.py index 8d395b7f..cda11c80 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -744,7 +744,7 @@ def sink_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - return cls._sink(lf=lf, io=ParquetStorageBackend(), file=file, **kwargs) + cls._sink(lf=lf, io=ParquetStorageBackend(), file=file, **kwargs) @classmethod def read_parquet( From c92585f8460398502d5e1014e4cae1698456d9f6 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 17:08:59 +0200 Subject: [PATCH 14/25] fix --- dataframely/schema.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dataframely/schema.py b/dataframely/schema.py index cda11c80..033fecb2 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -1027,15 +1027,12 @@ def read_parquet_metadata_schema( The schema that was serialized to the metadata. ``None`` if no schema metadata is found or the deserialization fails. """ - try: - metadata = pl.read_parquet_metadata(source) - except plexc.ComputeError: - return None + metadata = pl.read_parquet_metadata(source) if (schema_metadata := metadata.get(SCHEMA_METADATA_KEY)) is not None: try: return deserialize_schema(schema_metadata) - except JSONDecodeError: + except (JSONDecodeError, plexc.ComputeError): return None return None From 292a8b6b2f5c73b5c582b728afebba2c2c75ee34 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Aug 2025 17:47:37 +0200 Subject: [PATCH 15/25] review --- dataframely/collection.py | 12 ++++++++---- dataframely/schema.py | 22 +++++++++++++++++----- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/dataframely/collection.py b/dataframely/collection.py index 762eb538..e675a19c 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -850,10 +850,14 @@ def _read( ) -> Self: # Utility method encapsulating the interaction with the StorageBackend - scan_function = io.scan_collection if lazy else io.read_collection - data, serialized_collection_types = scan_function( - members=cls.member_schemas().keys(), **kwargs - ) + if lazy: + data, serialized_collection_types = io.scan_collection( + members=cls.member_schemas().keys(), **kwargs + ) + else: + data, serialized_collection_types = io.read_collection( + members=cls.member_schemas().keys(), **kwargs + ) collection_types = _deserialize_types(serialized_collection_types) collection_type = _reconcile_collection_types(collection_types) diff --git a/dataframely/schema.py b/dataframely/schema.py index 033fecb2..e7f76035 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -929,12 +929,15 @@ def _read( def _read( cls, io: StorageBackend, validation: Validation, lazy: bool, **kwargs: Any ) -> LazyFrame[Self] | DataFrame[Self]: - read_function = io.scan_frame if lazy else io.read_frame - source = kwargs.pop("source") # Load - df, serialized_schema = read_function(source=source) + if lazy: + lf, serialized_schema = io.scan_frame(source=source) + else: + df, serialized_schema = io.read_frame(source=source) + lf = df.lazy() + deserialized_schema = ( deserialize_schema(serialized_schema) if serialized_schema else None ) @@ -943,8 +946,17 @@ def _read( if cls._requires_validation_for_reading_parquet( deserialized_schema, validation, source=str(source) ): - return cls.validate(df, cast=True).lazy() - return cls.cast(df) + validated = cls.validate(lf, cast=True) + if lazy: + return validated.lazy() + else: + return validated + + casted = cls.cast(lf) + if lazy: + return casted + else: + return casted.collect() # ----------------------------- THIRD-PARTY PACKAGES ----------------------------- # From 6216dd71e27208d2a5decc24aca41ba0a27f5b41 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Tue, 26 Aug 2025 10:40:44 +0200 Subject: [PATCH 16/25] coverage --- tests/collection/test_read_write_parquet.py | 9 +++++++-- tests/schema/test_read_write_parquet.py | 7 +++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/collection/test_read_write_parquet.py b/tests/collection/test_read_write_parquet.py index eedc9626..d470e0d5 100644 --- a/tests/collection/test_read_write_parquet.py +++ b/tests/collection/test_read_write_parquet.py @@ -371,6 +371,7 @@ class MyCollection2(dy.Collection): ([MyCollection], MyCollection), # One missing type, cannot be sure ([MyCollection, None], None), + ([None, MyCollection], None), # Inconsistent types, treat like no information available ([MyCollection, MyCollection2], None), ], @@ -384,11 +385,15 @@ def test_reconcile_collection_types( # ---------------------------------- MANUAL METADATA --------------------------------- # -def test_read_invalid_parquet_metadata_collection(tmp_path: Path) -> None: +@pytest.mark.parametrize("metadata", [None, {COLLECTION_METADATA_KEY: "invalid"}]) +def test_read_invalid_parquet_metadata_collection( + tmp_path: Path, metadata: dict | None +) -> None: # Arrange df = pl.DataFrame({"a": [1, 2, 3]}) df.write_parquet( - tmp_path / "df.parquet", metadata={COLLECTION_METADATA_KEY: "invalid"} + tmp_path / "df.parquet", + metadata=metadata, ) # Act diff --git a/tests/schema/test_read_write_parquet.py b/tests/schema/test_read_write_parquet.py index 2e944cb0..cd696ebf 100644 --- a/tests/schema/test_read_write_parquet.py +++ b/tests/schema/test_read_write_parquet.py @@ -222,10 +222,13 @@ def test_read_write_parquet_validation_skip_invalid_schema( # ---------------------------------- MANUAL METADATA --------------------------------- # -def test_read_invalid_parquet_metadata_schema(tmp_path: Path) -> None: +@pytest.mark.parametrize("metadata", [{SCHEMA_METADATA_KEY: "invalid"}, None]) +def test_read_invalid_parquet_metadata_schema( + tmp_path: Path, metadata: dict | None +) -> None: # Arrange df = pl.DataFrame({"a": [1, 2, 3]}) - df.write_parquet(tmp_path / "df.parquet", metadata={SCHEMA_METADATA_KEY: "invalid"}) + df.write_parquet(tmp_path / "df.parquet", metadata=metadata) # Act schema = dy.read_parquet_metadata_schema(tmp_path / "df.parquet") From bcb290eb40b945810b45dcfafaad5e17b8d0946f Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Thu, 28 Aug 2025 15:31:01 +0200 Subject: [PATCH 17/25] review --- dataframely/_serialization.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index 16aa15c2..7dba0f5f 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -128,15 +128,15 @@ def object_hook(self, dct: dict[str, Any]) -> Any: class StorageBackend(ABC): - """Base class for IO managers. + """Base class for storage backends. - An IO manager encapsulates a way of serializing and deserializing dataframlely + A storage backend encapsulates a way of serializing and deserializing dataframlely data-/lazyframes and collections. This base class provides a unified interface for all such use cases. - The interface is designed to operate data provided as polars frames, and metadata - provided serialized strings. This design is meant to limit the coupling between the - Schema/Collection classes and specifics of how data and metadata is stored. + The interface is designed to operate on data provided as polars frames, and metadata + provided as serialized strings. This design is meant to limit the coupling between + the Schema/Collection classes and specifics of how data and metadata is stored. """ # ----------------------------------- Schemas ------------------------------------- From a2742ee9145e14352a24e7211ce5d39ed2431eac Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Thu, 28 Aug 2025 15:32:43 +0200 Subject: [PATCH 18/25] review --- dataframely/_serialization.py | 62 ----------------------------------- 1 file changed, 62 deletions(-) diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index 7dba0f5f..2bc75da2 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -278,17 +278,6 @@ class ParquetStorageBackend(StorageBackend): def sink_frame( self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any ) -> None: - """This method stores frames as individual parquet files. - - Args: - lf: LazyFrame containing the data to be stored. - kwargs: The "file" kwarg is required to specify where data is stored. - It should point to a parquet file. If hive partitioning is used, - it should point to a directory. - The "metadata" kwarg is supported to pass a dictionary of parquet - metadata. - Additional keyword arguments are passed to polars. - """ file = kwargs.pop("file") metadata = kwargs.pop("metadata", {}) lf.sink_parquet( @@ -300,17 +289,6 @@ def sink_frame( def write_frame( self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any ) -> None: - """This method stores frames as individual parquet files. - - Args: - df: DataFrame containing the data to be stored. - kwargs: The "file" kwarg is required to specify where data is stored. - It should point to a parquet file. If hive partitioning is used, - it should point to a directory. - The "metadata" kwarg is supported to pass a dictionary of parquet - metadata. - Additional keyword arguments are passed to polars. - """ file = kwargs.pop("file") metadata = kwargs.pop("metadata", {}) df.write_parquet( @@ -320,24 +298,12 @@ def write_frame( ) def scan_frame(self, **kwargs: Any) -> tuple[pl.LazyFrame, SerializedSchema | None]: - """Lazily read single frames from parquet. - - Args: - kwargs: The "source" kwarg is required to specify where data is stored. - Other kwargs are passed to polars. - """ source = kwargs.pop("source") lf = pl.scan_parquet(source, **kwargs) metadata = _read_serialized_schema(source) return lf, metadata def read_frame(self, **kwargs: Any) -> tuple[pl.DataFrame, SerializedSchema | None]: - """Eagerly read single frames from parquet. - - Args: - kwargs: The "source" kwarg is required to specify where data is stored. - Other kwargs are passed to polars. - """ source = kwargs.pop("source") df = pl.read_parquet(source, **kwargs) metadata = _read_serialized_schema(source) @@ -351,14 +317,6 @@ def sink_collection( serialized_schemas: dict[str, str], **kwargs: Any, ) -> None: - """Stream multiple frames to parquet. - - Args: - dfs: See base class. - serialized_collection: See base class. - serialized_schemas: See base class. - kwargs: The "directory" kwarg is required to specify where data is stored. - """ path = Path(kwargs.pop("directory")) # The collection schema is serialized as part of the member parquet metadata @@ -384,14 +342,6 @@ def write_collection( serialized_schemas: dict[str, str], **kwargs: Any, ) -> None: - """Write multiple frames to parquet. - - Args: - dfs: See base class. - serialized_collection: See base class. - serialized_schemas: See base class. - kwargs: The "directory" kwarg is required to specify where data is stored. - """ path = Path(kwargs.pop("directory")) # The collection schema is serialized as part of the member parquet metadata @@ -413,12 +363,6 @@ def write_collection( def scan_collection( self, members: Iterable[str], **kwargs: Any ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: - """Lazily read multiple frames from parquet. - - Args: - members: See base class. - kwargs: The "directory" kwarg is required to specify where data is stored. - """ path = Path(kwargs.pop("directory")) return self._collection_from_parquet( path=path, members=members, scan=True, **kwargs @@ -427,12 +371,6 @@ def scan_collection( def read_collection( self, members: Iterable[str], **kwargs: Any ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: - """Eagerly read multiple frames from parquet. - - Args: - members: See base class. - kwargs: The "directory" kwarg is required to specify where data is stored. - """ path = Path(kwargs.pop("directory")) return self._collection_from_parquet( path=path, members=members, scan=False, **kwargs From 5a0823e1616138e20db9bd0ca93e0e9421757a46 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Thu, 28 Aug 2025 15:45:16 +0200 Subject: [PATCH 19/25] refactor --- dataframely/_serialization.py | 307 -------------------- dataframely/_storage/__init__.py | 10 + dataframely/_storage/base.py | 150 ++++++++++ dataframely/_storage/parquet.py | 172 +++++++++++ dataframely/collection.py | 5 +- dataframely/failure.py | 2 +- dataframely/schema.py | 5 +- tests/collection/test_read_write_parquet.py | 2 +- tests/schema/test_read_write_parquet.py | 2 +- tests/test_failure_info.py | 2 +- 10 files changed, 340 insertions(+), 317 deletions(-) create mode 100644 dataframely/_storage/__init__.py create mode 100644 dataframely/_storage/base.py create mode 100644 dataframely/_storage/parquet.py diff --git a/dataframely/_serialization.py b/dataframely/_serialization.py index 2bc75da2..4a60fe79 100644 --- a/dataframely/_serialization.py +++ b/dataframely/_serialization.py @@ -4,17 +4,12 @@ import base64 import datetime as dt import decimal -from abc import ABC, abstractmethod -from collections.abc import Iterable from io import BytesIO from json import JSONDecoder, JSONEncoder -from pathlib import Path from typing import Any, cast import polars as pl -SCHEMA_METADATA_KEY = "dataframely_schema" -COLLECTION_METADATA_KEY = "dataframely_collection" SERIALIZATION_FORMAT_VERSION = "1" @@ -121,305 +116,3 @@ def object_hook(self, dct: dict[str, Any]) -> Any: ) case _: raise TypeError(f"Unknown type '{dct['__type__']}' in JSON data.") - - -SerializedSchema = str -SerializedCollection = str - - -class StorageBackend(ABC): - """Base class for storage backends. - - A storage backend encapsulates a way of serializing and deserializing dataframlely - data-/lazyframes and collections. This base class provides a unified interface for - all such use cases. - - The interface is designed to operate on data provided as polars frames, and metadata - provided as serialized strings. This design is meant to limit the coupling between - the Schema/Collection classes and specifics of how data and metadata is stored. - """ - - # ----------------------------------- Schemas ------------------------------------- - @abstractmethod - def sink_frame( - self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any - ) -> None: - """Stream the contents of a dataframe, and its metadata to the storage backend. - - Args: - lf: A frame containing the data to be stored. - serialized_schema: String-serialized schema information. - kwargs: Additional keyword arguments to pass to the underlying storage - implementation. - """ - - @abstractmethod - def write_frame( - self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any - ) -> None: - """Write the contents of a dataframe, and its metadata to the storage backend. - - Args: - df: A dataframe containing the data to be stored. - frame: String-serialized schema information. - kwargs: Additional keyword arguments to pass to the underlying storage - implementation. - """ - - @abstractmethod - def scan_frame(self, **kwargs: Any) -> tuple[pl.LazyFrame, SerializedSchema | None]: - """Lazily read frame data and metadata from the storage backend. - - Args: - kwargs: Keyword arguments to pass to the underlying storage. - Refer to the individual implementation to see which keywords - are available. - Returns: - A tuple of the lazy frame data and metadata if available. - """ - - @abstractmethod - def read_frame(self, **kwargs: Any) -> tuple[pl.DataFrame, SerializedSchema | None]: - """Eagerly read frame data and metadata from the storage backend. - - Args: - kwargs: Keyword arguments to pass to the underlying storage. - Refer to the individual implementation to see which keywords - are available. - Returns: - A tuple of the lazy frame data and metadata if available. - """ - - # ------------------------------ Collections --------------------------------------- - @abstractmethod - def sink_collection( - self, - dfs: dict[str, pl.LazyFrame], - serialized_collection: SerializedCollection, - serialized_schemas: dict[str, str], - **kwargs: Any, - ) -> None: - """Stream the members of this collection into the storage backend. - - Args: - dfs: Dictionary containing the data to be stored. - serialized_collection: String-serialized information about the origin Collection. - serialized_schemas: String-serialized information about the individual Schemas - for each of the member frames. This information is also logically included - in the collection metadata, but it is passed separately here to ensure that - each member can also be read back as an individual frame. - """ - - @abstractmethod - def write_collection( - self, - dfs: dict[str, pl.LazyFrame], - serialized_collection: SerializedCollection, - serialized_schemas: dict[str, str], - **kwargs: Any, - ) -> None: - """Write the members of this collection into the storage backend. - - Args: - dfs: Dictionary containing the data to be stored. - serialized_collection: String-serialized information about the origin Collection. - serialized_schemas: String-serialized information about the individual Schemas - for each of the member frames. This information is also logically included - in the collection metadata, but it is passed separately here to ensure that - each member can also be read back as an individual frame. - """ - - @abstractmethod - def scan_collection( - self, members: Iterable[str], **kwargs: Any - ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: - """Lazily read all collection members from the storage backend. - - Args: - members: Collection member names to read. - kwargs: Additional keyword arguments to pass to the underlying storage. - Refer to the individual implementation to see which keywords are available. - Returns: - A tuple of the collection data and metadata if available. - Depending on the storage implementation, multiple copies of the metadata - may be available, which are returned as a list. - It is up to the caller to decide how to handle the presence/absence/consistency - of the returned values. - """ - - @abstractmethod - def read_collection( - self, members: Iterable[str], **kwargs: Any - ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: - """Lazily read all collection members from the storage backend. - - Args: - members: Collection member names to read. - kwargs: Additional keyword arguments to pass to the underlying storage. - Refer to the individual implementation to see which keywords are available. - Returns: - A tuple of the collection data and metadata if available. - Depending on the storage implementation, multiple copies of the metadata - may be available, which are returned as a list. - It is up to the caller to decide how to handle the presence/absence/consistency - of the returned values. - """ - - -class ParquetStorageBackend(StorageBackend): - """IO manager that stores data and metadata in parquet files on a file system. - - Single frames are stored as individual parquet files - - Collections are stored as directories. - """ - - # ----------------------------------- Schemas ------------------------------------- - def sink_frame( - self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any - ) -> None: - file = kwargs.pop("file") - metadata = kwargs.pop("metadata", {}) - lf.sink_parquet( - file, - metadata={**metadata, SCHEMA_METADATA_KEY: serialized_schema}, - **kwargs, - ) - - def write_frame( - self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any - ) -> None: - file = kwargs.pop("file") - metadata = kwargs.pop("metadata", {}) - df.write_parquet( - file, - metadata={**metadata, SCHEMA_METADATA_KEY: serialized_schema}, - **kwargs, - ) - - def scan_frame(self, **kwargs: Any) -> tuple[pl.LazyFrame, SerializedSchema | None]: - source = kwargs.pop("source") - lf = pl.scan_parquet(source, **kwargs) - metadata = _read_serialized_schema(source) - return lf, metadata - - def read_frame(self, **kwargs: Any) -> tuple[pl.DataFrame, SerializedSchema | None]: - source = kwargs.pop("source") - df = pl.read_parquet(source, **kwargs) - metadata = _read_serialized_schema(source) - return df, metadata - - # ------------------------------ Collections --------------------------------------- - def sink_collection( - self, - dfs: dict[str, pl.LazyFrame], - serialized_collection: SerializedCollection, - serialized_schemas: dict[str, str], - **kwargs: Any, - ) -> None: - path = Path(kwargs.pop("directory")) - - # The collection schema is serialized as part of the member parquet metadata - kwargs["metadata"] = kwargs.get("metadata", {}) | { - COLLECTION_METADATA_KEY: serialized_collection - } - - for key, lf in dfs.items(): - destination = ( - path / key if "partition_by" in kwargs else path / f"{key}.parquet" - ) - self.sink_frame( - lf, - serialized_schema=serialized_schemas[key], - file=destination, - **kwargs, - ) - - def write_collection( - self, - dfs: dict[str, pl.LazyFrame], - serialized_collection: SerializedCollection, - serialized_schemas: dict[str, str], - **kwargs: Any, - ) -> None: - path = Path(kwargs.pop("directory")) - - # The collection schema is serialized as part of the member parquet metadata - kwargs["metadata"] = kwargs.get("metadata", {}) | { - COLLECTION_METADATA_KEY: serialized_collection - } - - for key, lf in dfs.items(): - destination = ( - path / key if "partition_by" in kwargs else path / f"{key}.parquet" - ) - self.write_frame( - lf.collect(), - serialized_schema=serialized_schemas[key], - file=destination, - **kwargs, - ) - - def scan_collection( - self, members: Iterable[str], **kwargs: Any - ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: - path = Path(kwargs.pop("directory")) - return self._collection_from_parquet( - path=path, members=members, scan=True, **kwargs - ) - - def read_collection( - self, members: Iterable[str], **kwargs: Any - ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: - path = Path(kwargs.pop("directory")) - return self._collection_from_parquet( - path=path, members=members, scan=False, **kwargs - ) - - def _collection_from_parquet( - self, path: Path, members: Iterable[str], scan: bool, **kwargs: Any - ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: - # Utility method encapsulating the logic that is common - # between lazy and eager reads - data = {} - collection_types = [] - - for key in members: - if (source_path := self._member_source_path(path, key)) is not None: - data[key] = ( - pl.scan_parquet(source_path, **kwargs) - if scan - else pl.read_parquet(source_path, **kwargs).lazy() - ) - if source_path.is_file(): - collection_types.append(_read_serialized_collection(source_path)) - else: - for file in source_path.glob("**/*.parquet"): - collection_types.append(_read_serialized_collection(file)) - - # Backward compatibility: If the parquets do not have schema information, - # fall back to looking for schema.json - if not any(collection_types) and (schema_file := path / "schema.json").exists(): - collection_types.append(schema_file.read_text()) - - return data, collection_types - - @classmethod - def _member_source_path(cls, base_path: Path, name: str) -> Path | None: - if (path := base_path / name).exists() and base_path.is_dir(): - # We assume that the member is stored as a hive-partitioned dataset - return path - if (path := base_path / f"{name}.parquet").exists(): - # We assume that the member is stored as a single parquet file - return path - return None - - -def _read_serialized_collection(path: Path) -> SerializedCollection | None: - meta = pl.read_parquet_metadata(path) - return meta.get(COLLECTION_METADATA_KEY) - - -def _read_serialized_schema(path: Path) -> SerializedSchema | None: - meta = pl.read_parquet_metadata(path) - return meta.get(SCHEMA_METADATA_KEY) diff --git a/dataframely/_storage/__init__.py b/dataframely/_storage/__init__.py new file mode 100644 index 00000000..fa061350 --- /dev/null +++ b/dataframely/_storage/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) QuantCo 2025-2025 +# SPDX-License-Identifier: BSD-3-Clause + +from .base import StorageBackend +from .parquet import ParquetStorageBackend + +__all__ = [ + "ParquetStorageBackend", + "StorageBackend", +] diff --git a/dataframely/_storage/base.py b/dataframely/_storage/base.py new file mode 100644 index 00000000..c931ce48 --- /dev/null +++ b/dataframely/_storage/base.py @@ -0,0 +1,150 @@ +# Copyright (c) QuantCo 2025-2025 +# SPDX-License-Identifier: BSD-3-Clause + +from abc import ABC, abstractmethod +from collections.abc import Iterable +from typing import Any + +import polars as pl + +SerializedSchema = str +SerializedCollection = str + + +class StorageBackend(ABC): + """Base class for storage backends. + + A storage backend encapsulates a way of serializing and deserializing dataframlely + data-/lazyframes and collections. This base class provides a unified interface for + all such use cases. + + The interface is designed to operate on data provided as polars frames, and metadata + provided as serialized strings. This design is meant to limit the coupling between + the Schema/Collection classes and specifics of how data and metadata is stored. + """ + + # ----------------------------------- Schemas ------------------------------------- + @abstractmethod + def sink_frame( + self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any + ) -> None: + """Stream the contents of a dataframe, and its metadata to the storage backend. + + Args: + lf: A frame containing the data to be stored. + serialized_schema: String-serialized schema information. + kwargs: Additional keyword arguments to pass to the underlying storage + implementation. + """ + + @abstractmethod + def write_frame( + self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any + ) -> None: + """Write the contents of a dataframe, and its metadata to the storage backend. + + Args: + df: A dataframe containing the data to be stored. + frame: String-serialized schema information. + kwargs: Additional keyword arguments to pass to the underlying storage + implementation. + """ + + @abstractmethod + def scan_frame(self, **kwargs: Any) -> tuple[pl.LazyFrame, SerializedSchema | None]: + """Lazily read frame data and metadata from the storage backend. + + Args: + kwargs: Keyword arguments to pass to the underlying storage. + Refer to the individual implementation to see which keywords + are available. + Returns: + A tuple of the lazy frame data and metadata if available. + """ + + @abstractmethod + def read_frame(self, **kwargs: Any) -> tuple[pl.DataFrame, SerializedSchema | None]: + """Eagerly read frame data and metadata from the storage backend. + + Args: + kwargs: Keyword arguments to pass to the underlying storage. + Refer to the individual implementation to see which keywords + are available. + Returns: + A tuple of the lazy frame data and metadata if available. + """ + + # ------------------------------ Collections --------------------------------------- + @abstractmethod + def sink_collection( + self, + dfs: dict[str, pl.LazyFrame], + serialized_collection: SerializedCollection, + serialized_schemas: dict[str, str], + **kwargs: Any, + ) -> None: + """Stream the members of this collection into the storage backend. + + Args: + dfs: Dictionary containing the data to be stored. + serialized_collection: String-serialized information about the origin Collection. + serialized_schemas: String-serialized information about the individual Schemas + for each of the member frames. This information is also logically included + in the collection metadata, but it is passed separately here to ensure that + each member can also be read back as an individual frame. + """ + + @abstractmethod + def write_collection( + self, + dfs: dict[str, pl.LazyFrame], + serialized_collection: SerializedCollection, + serialized_schemas: dict[str, str], + **kwargs: Any, + ) -> None: + """Write the members of this collection into the storage backend. + + Args: + dfs: Dictionary containing the data to be stored. + serialized_collection: String-serialized information about the origin Collection. + serialized_schemas: String-serialized information about the individual Schemas + for each of the member frames. This information is also logically included + in the collection metadata, but it is passed separately here to ensure that + each member can also be read back as an individual frame. + """ + + @abstractmethod + def scan_collection( + self, members: Iterable[str], **kwargs: Any + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + """Lazily read all collection members from the storage backend. + + Args: + members: Collection member names to read. + kwargs: Additional keyword arguments to pass to the underlying storage. + Refer to the individual implementation to see which keywords are available. + Returns: + A tuple of the collection data and metadata if available. + Depending on the storage implementation, multiple copies of the metadata + may be available, which are returned as a list. + It is up to the caller to decide how to handle the presence/absence/consistency + of the returned values. + """ + + @abstractmethod + def read_collection( + self, members: Iterable[str], **kwargs: Any + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + """Lazily read all collection members from the storage backend. + + Args: + members: Collection member names to read. + kwargs: Additional keyword arguments to pass to the underlying storage. + Refer to the individual implementation to see which keywords are available. + Returns: + A tuple of the collection data and metadata if available. + Depending on the storage implementation, multiple copies of the metadata + may be available, which are returned as a list. + It is up to the caller to decide how to handle the presence/absence/consistency + of the returned values. + """ diff --git a/dataframely/_storage/parquet.py b/dataframely/_storage/parquet.py new file mode 100644 index 00000000..b1451da7 --- /dev/null +++ b/dataframely/_storage/parquet.py @@ -0,0 +1,172 @@ +# Copyright (c) QuantCo 2025-2025 +# SPDX-License-Identifier: BSD-3-Clause + +from collections.abc import Iterable +from pathlib import Path +from typing import Any + +import polars as pl + +from .base import SerializedCollection, SerializedSchema, StorageBackend + +SCHEMA_METADATA_KEY = "dataframely_schema" +COLLECTION_METADATA_KEY = "dataframely_collection" + + +class ParquetStorageBackend(StorageBackend): + """IO manager that stores data and metadata in parquet files on a file system. + + Single frames are stored as individual parquet files + + Collections are stored as directories. + """ + + # ----------------------------------- Schemas ------------------------------------- + def sink_frame( + self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any + ) -> None: + file = kwargs.pop("file") + metadata = kwargs.pop("metadata", {}) + lf.sink_parquet( + file, + metadata={**metadata, SCHEMA_METADATA_KEY: serialized_schema}, + **kwargs, + ) + + def write_frame( + self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any + ) -> None: + file = kwargs.pop("file") + metadata = kwargs.pop("metadata", {}) + df.write_parquet( + file, + metadata={**metadata, SCHEMA_METADATA_KEY: serialized_schema}, + **kwargs, + ) + + def scan_frame(self, **kwargs: Any) -> tuple[pl.LazyFrame, SerializedSchema | None]: + source = kwargs.pop("source") + lf = pl.scan_parquet(source, **kwargs) + metadata = _read_serialized_schema(source) + return lf, metadata + + def read_frame(self, **kwargs: Any) -> tuple[pl.DataFrame, SerializedSchema | None]: + source = kwargs.pop("source") + df = pl.read_parquet(source, **kwargs) + metadata = _read_serialized_schema(source) + return df, metadata + + # ------------------------------ Collections --------------------------------------- + def sink_collection( + self, + dfs: dict[str, pl.LazyFrame], + serialized_collection: SerializedCollection, + serialized_schemas: dict[str, str], + **kwargs: Any, + ) -> None: + path = Path(kwargs.pop("directory")) + + # The collection schema is serialized as part of the member parquet metadata + kwargs["metadata"] = kwargs.get("metadata", {}) | { + COLLECTION_METADATA_KEY: serialized_collection + } + + for key, lf in dfs.items(): + destination = ( + path / key if "partition_by" in kwargs else path / f"{key}.parquet" + ) + self.sink_frame( + lf, + serialized_schema=serialized_schemas[key], + file=destination, + **kwargs, + ) + + def write_collection( + self, + dfs: dict[str, pl.LazyFrame], + serialized_collection: SerializedCollection, + serialized_schemas: dict[str, str], + **kwargs: Any, + ) -> None: + path = Path(kwargs.pop("directory")) + + # The collection schema is serialized as part of the member parquet metadata + kwargs["metadata"] = kwargs.get("metadata", {}) | { + COLLECTION_METADATA_KEY: serialized_collection + } + + for key, lf in dfs.items(): + destination = ( + path / key if "partition_by" in kwargs else path / f"{key}.parquet" + ) + self.write_frame( + lf.collect(), + serialized_schema=serialized_schemas[key], + file=destination, + **kwargs, + ) + + def scan_collection( + self, members: Iterable[str], **kwargs: Any + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + path = Path(kwargs.pop("directory")) + return self._collection_from_parquet( + path=path, members=members, scan=True, **kwargs + ) + + def read_collection( + self, members: Iterable[str], **kwargs: Any + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + path = Path(kwargs.pop("directory")) + return self._collection_from_parquet( + path=path, members=members, scan=False, **kwargs + ) + + def _collection_from_parquet( + self, path: Path, members: Iterable[str], scan: bool, **kwargs: Any + ) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]: + # Utility method encapsulating the logic that is common + # between lazy and eager reads + data = {} + collection_types = [] + + for key in members: + if (source_path := self._member_source_path(path, key)) is not None: + data[key] = ( + pl.scan_parquet(source_path, **kwargs) + if scan + else pl.read_parquet(source_path, **kwargs).lazy() + ) + if source_path.is_file(): + collection_types.append(_read_serialized_collection(source_path)) + else: + for file in source_path.glob("**/*.parquet"): + collection_types.append(_read_serialized_collection(file)) + + # Backward compatibility: If the parquets do not have schema information, + # fall back to looking for schema.json + if not any(collection_types) and (schema_file := path / "schema.json").exists(): + collection_types.append(schema_file.read_text()) + + return data, collection_types + + @classmethod + def _member_source_path(cls, base_path: Path, name: str) -> Path | None: + if (path := base_path / name).exists() and base_path.is_dir(): + # We assume that the member is stored as a hive-partitioned dataset + return path + if (path := base_path / f"{name}.parquet").exists(): + # We assume that the member is stored as a single parquet file + return path + return None + + +def _read_serialized_collection(path: Path) -> SerializedCollection | None: + meta = pl.read_parquet_metadata(path) + return meta.get(COLLECTION_METADATA_KEY) + + +def _read_serialized_schema(path: Path) -> SerializedSchema | None: + meta = pl.read_parquet_metadata(path) + return meta.get(SCHEMA_METADATA_KEY) diff --git a/dataframely/collection.py b/dataframely/collection.py index 06b4318f..45e0526f 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -19,14 +19,13 @@ from ._filter import Filter from ._polars import FrameType from ._serialization import ( - COLLECTION_METADATA_KEY, SERIALIZATION_FORMAT_VERSION, - ParquetStorageBackend, SchemaJSONDecoder, SchemaJSONEncoder, - StorageBackend, serialization_versions, ) +from ._storage.base import StorageBackend +from ._storage.parquet import COLLECTION_METADATA_KEY, ParquetStorageBackend from ._typing import LazyFrame, Validation from .exc import ( MemberValidationError, diff --git a/dataframely/failure.py b/dataframely/failure.py index 4b6f31cf..f6eac30b 100644 --- a/dataframely/failure.py +++ b/dataframely/failure.py @@ -13,7 +13,7 @@ from dataframely._base_schema import BaseSchema -from ._serialization import SCHEMA_METADATA_KEY +from ._storage.parquet import SCHEMA_METADATA_KEY if TYPE_CHECKING: # pragma: no cover from .schema import Schema diff --git a/dataframely/schema.py b/dataframely/schema.py index c9398ddb..ac16dfd4 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -21,14 +21,13 @@ from ._compat import pa, sa from ._rule import Rule, rule_from_dict, with_evaluation_rules from ._serialization import ( - SCHEMA_METADATA_KEY, SERIALIZATION_FORMAT_VERSION, - ParquetStorageBackend, SchemaJSONDecoder, SchemaJSONEncoder, - StorageBackend, serialization_versions, ) +from ._storage.base import StorageBackend +from ._storage.parquet import SCHEMA_METADATA_KEY, ParquetStorageBackend from ._typing import DataFrame, LazyFrame, Validation from ._validation import DtypeCasting, validate_columns, validate_dtypes from .columns import Column, column_from_dict diff --git a/tests/collection/test_read_write_parquet.py b/tests/collection/test_read_write_parquet.py index d470e0d5..c8c7985b 100644 --- a/tests/collection/test_read_write_parquet.py +++ b/tests/collection/test_read_write_parquet.py @@ -12,7 +12,7 @@ from polars.testing import assert_frame_equal import dataframely as dy -from dataframely._serialization import COLLECTION_METADATA_KEY +from dataframely._storage.parquet import COLLECTION_METADATA_KEY from dataframely.collection import _reconcile_collection_types from dataframely.exc import ValidationRequiredError from dataframely.testing import create_collection, create_schema diff --git a/tests/schema/test_read_write_parquet.py b/tests/schema/test_read_write_parquet.py index cd696ebf..7095c5b0 100644 --- a/tests/schema/test_read_write_parquet.py +++ b/tests/schema/test_read_write_parquet.py @@ -10,7 +10,7 @@ from polars.testing import assert_frame_equal import dataframely as dy -from dataframely._serialization import SCHEMA_METADATA_KEY +from dataframely._storage.parquet import SCHEMA_METADATA_KEY from dataframely.exc import ValidationRequiredError from dataframely.testing import create_schema diff --git a/tests/test_failure_info.py b/tests/test_failure_info.py index 5deed699..d8ecdfe0 100644 --- a/tests/test_failure_info.py +++ b/tests/test_failure_info.py @@ -9,7 +9,7 @@ from polars.testing import assert_frame_equal import dataframely as dy -from dataframely._serialization import SCHEMA_METADATA_KEY +from dataframely._storage.parquet import SCHEMA_METADATA_KEY from dataframely.failure import RULE_METADATA_KEY, UNKNOWN_SCHEMA_NAME, FailureInfo From ede0fdec93aa3fb3537b84f4a2abc5ed28580bd5 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Thu, 28 Aug 2025 16:23:40 +0200 Subject: [PATCH 20/25] failure --- dataframely/_storage/__init__.py | 8 --- dataframely/_storage/base.py | 48 ++++++++++++++++ dataframely/_storage/parquet.py | 58 ++++++++++++++++++- dataframely/failure.py | 96 +++++++++++++++++++++----------- tests/test_failure_info.py | 4 +- 5 files changed, 172 insertions(+), 42 deletions(-) diff --git a/dataframely/_storage/__init__.py b/dataframely/_storage/__init__.py index fa061350..e047415f 100644 --- a/dataframely/_storage/__init__.py +++ b/dataframely/_storage/__init__.py @@ -1,10 +1,2 @@ # Copyright (c) QuantCo 2025-2025 # SPDX-License-Identifier: BSD-3-Clause - -from .base import StorageBackend -from .parquet import ParquetStorageBackend - -__all__ = [ - "ParquetStorageBackend", - "StorageBackend", -] diff --git a/dataframely/_storage/base.py b/dataframely/_storage/base.py index c931ce48..feaf7e66 100644 --- a/dataframely/_storage/base.py +++ b/dataframely/_storage/base.py @@ -9,6 +9,7 @@ SerializedSchema = str SerializedCollection = str +SerializedRules = str class StorageBackend(ABC): @@ -148,3 +149,50 @@ def read_collection( It is up to the caller to decide how to handle the presence/absence/consistency of the returned values. """ + + # ------------------------------ Failure Info -------------------------------------- + @abstractmethod + def sink_failure_info( + self, + lf: pl.LazyFrame, + serialized_rules: SerializedRules, + serialized_schema: SerializedSchema, + **kwargs: Any, + ) -> None: + """Stream the failure info to the storage backend. + + Args: + lf: LazyFrame backing the failure info. + serialized_rules: String-serialized information about the rules the + failing cases violated. + serialized_schema: String-serialized schema information. + """ + + @abstractmethod + def write_failure_info( + self, + df: pl.DataFrame, + serialized_rules: SerializedRules, + serialized_schema: SerializedSchema, + **kwargs: Any, + ) -> None: + """Write the failure info to the storage backend. + + Args: + df: DataFrame backing the failure info. + serialized_rules: String-serialized information about the rules the + failing cases violated. + serialized_schema: String-serialized schema information. + """ + + @abstractmethod + def scan_failure_info( + self, **kwargs: Any + ) -> tuple[pl.LazyFrame, SerializedRules, SerializedSchema]: + """Lazily read the failure info from the storage backend.""" + + @abstractmethod + def read_failure_info( + self, **kwargs: Any + ) -> tuple[pl.DataFrame, SerializedRules, SerializedSchema]: + """Read the failure info from the storage backend.""" diff --git a/dataframely/_storage/parquet.py b/dataframely/_storage/parquet.py index b1451da7..91029c9f 100644 --- a/dataframely/_storage/parquet.py +++ b/dataframely/_storage/parquet.py @@ -7,10 +7,16 @@ import polars as pl -from .base import SerializedCollection, SerializedSchema, StorageBackend +from .base import ( + SerializedCollection, + SerializedRules, + SerializedSchema, + StorageBackend, +) SCHEMA_METADATA_KEY = "dataframely_schema" COLLECTION_METADATA_KEY = "dataframely_collection" +RULE_METADATA_KEY = "dataframely_rule_columns" class ParquetStorageBackend(StorageBackend): @@ -161,6 +167,56 @@ def _member_source_path(cls, base_path: Path, name: str) -> Path | None: return path return None + # ------------------------------ Failure Info -------------------------------------- + def sink_failure_info( + self, + lf: pl.LazyFrame, + serialized_rules: SerializedRules, + serialized_schema: SerializedSchema, + **kwargs: Any, + ) -> None: + file = kwargs.pop("file") + metadata = kwargs.pop("metadata", {}) + metadata[RULE_METADATA_KEY] = serialized_rules + metadata[SCHEMA_METADATA_KEY] = serialized_schema + lf.sink_parquet(file, metadata=metadata, **kwargs) + + def write_failure_info( + self, + df: pl.DataFrame, + serialized_rules: SerializedRules, + serialized_schema: SerializedSchema, + **kwargs: Any, + ) -> None: + file = kwargs.pop("file") + metadata = kwargs.pop("metadata", {}) + metadata[RULE_METADATA_KEY] = serialized_rules + metadata[SCHEMA_METADATA_KEY] = serialized_schema + df.write_parquet(file, metadata=metadata, **kwargs) + + def scan_failure_info( + self, **kwargs: Any + ) -> tuple[pl.LazyFrame, SerializedRules, SerializedSchema]: + file = kwargs.pop("file") + metadata = pl.read_parquet_metadata(file) + schema_metadata = metadata.get(SCHEMA_METADATA_KEY) + + rule_metadata = metadata.get(RULE_METADATA_KEY) + if schema_metadata is None or rule_metadata is None: + raise ValueError("The parquet file does not contain the required metadata.") + lf = pl.scan_parquet(file, **kwargs) + return lf, rule_metadata, schema_metadata + + def read_failure_info( + self, **kwargs: Any + ) -> tuple[pl.DataFrame, SerializedRules, SerializedSchema]: + lf, rule_metadata, schema_metadata = self.scan_failure_info(**kwargs) + return ( + lf.collect(), + rule_metadata, + schema_metadata, + ) + def _read_serialized_collection(path: Path) -> SerializedCollection | None: meta = pl.read_parquet_metadata(path) diff --git a/dataframely/failure.py b/dataframely/failure.py index f6eac30b..19f4d7b6 100644 --- a/dataframely/failure.py +++ b/dataframely/failure.py @@ -13,12 +13,12 @@ from dataframely._base_schema import BaseSchema -from ._storage.parquet import SCHEMA_METADATA_KEY +from ._storage.base import StorageBackend +from ._storage.parquet import ParquetStorageBackend if TYPE_CHECKING: # pragma: no cover from .schema import Schema -RULE_METADATA_KEY = "dataframely_rule_columns" UNKNOWN_SCHEMA_NAME = "__DATAFRAMELY_UNKNOWN__" S = TypeVar("S", bound=BaseSchema) @@ -98,8 +98,7 @@ def write_parquet(self, file: str | Path | IO[bytes], **kwargs: Any) -> None: Be aware that this method suffers from the same limitations as :meth:`Schema.serialize`. """ - metadata, kwargs = self._build_metadata(**kwargs) - self._df.write_parquet(file, metadata=metadata, **kwargs) + self._write(ParquetStorageBackend(), file=file, **kwargs) def sink_parquet( self, file: str | Path | IO[bytes] | PartitioningScheme, **kwargs: Any @@ -118,16 +117,7 @@ def sink_parquet( Be aware that this method suffers from the same limitations as :meth:`Schema.serialize`. """ - metadata, kwargs = self._build_metadata(**kwargs) - self._lf.sink_parquet(file, metadata=metadata, **kwargs) - - def _build_metadata( - self, **kwargs: dict[str, Any] - ) -> tuple[dict[str, Any], dict[str, Any]]: - metadata = kwargs.pop("metadata", {}) - metadata[RULE_METADATA_KEY] = json.dumps(self._rule_columns) - metadata[SCHEMA_METADATA_KEY] = self.schema.serialize() - return metadata, kwargs + self._sink(ParquetStorageBackend(), file=file, **kwargs) @classmethod def read_parquet( @@ -150,7 +140,7 @@ def read_parquet( Be aware that this method suffers from the same limitations as :meth:`Schema.serialize` """ - return cls._from_parquet(source, scan=False, **kwargs) + return cls._read(io=ParquetStorageBackend(), file=source, **kwargs) @classmethod def scan_parquet( @@ -171,32 +161,76 @@ def scan_parquet( Be aware that this method suffers from the same limitations as :meth:`Schema.serialize` """ - return cls._from_parquet(source, scan=True, **kwargs) + return cls._scan(io=ParquetStorageBackend(), file=source, **kwargs) + + def _sink( + self, + io: StorageBackend, + file: str | Path | IO[bytes] | PartitioningScheme, + **kwargs: Any, + ) -> None: + io.sink_failure_info( + lf=self._lf, + serialized_rules=json.dumps(self._rule_columns), + serialized_schema=self.schema.serialize(), + file=file, + **kwargs, + ) + + def _write( + self, + io: StorageBackend, + file: str | Path | IO[bytes] | PartitioningScheme, + **kwargs: Any, + ) -> None: + io.write_failure_info( + df=self._df, + serialized_rules=json.dumps(self._rule_columns), + serialized_schema=self.schema.serialize(), + file=file, + **kwargs, + ) @classmethod - def _from_parquet( - cls, source: str | Path | IO[bytes], scan: bool, **kwargs: Any + def _scan( + cls, + io: StorageBackend, + file: str | Path | IO[bytes] | PartitioningScheme, + **kwargs: Any, ) -> FailureInfo[Schema]: from .schema import Schema, deserialize_schema - metadata = pl.read_parquet_metadata(source) - schema_metadata = metadata.get(SCHEMA_METADATA_KEY) - rule_metadata = metadata.get(RULE_METADATA_KEY) - if schema_metadata is None or rule_metadata is None: - raise ValueError("The parquet file does not contain the required metadata.") - - lf = ( - pl.scan_parquet(source, **kwargs) - if scan - else pl.read_parquet(source, **kwargs).lazy() + lf, serialized_rules, serialized_schema = io.scan_failure_info( + file=file, **kwargs ) - failure_schema = deserialize_schema(schema_metadata, strict=False) or type( + schema = deserialize_schema(serialized_schema, strict=False) or type( UNKNOWN_SCHEMA_NAME, (Schema,), {} ) return FailureInfo( lf, - json.loads(rule_metadata), - schema=failure_schema, + json.loads(serialized_rules), + schema=schema, + ) + + @classmethod + def _read( + cls, + io: StorageBackend, + file: str | Path | IO[bytes] | PartitioningScheme, + **kwargs: Any, + ) -> FailureInfo[Schema]: + from .schema import Schema, deserialize_schema + + df, serialized_rules, serialized_schema = io.scan_failure_info( + file=file, **kwargs + ) + schema = deserialize_schema(serialized_schema, strict=False) or type( + UNKNOWN_SCHEMA_NAME, (Schema,), {} + ) + return FailureInfo( + df, + json.loads(serialized_rules), + schema=schema, ) diff --git a/tests/test_failure_info.py b/tests/test_failure_info.py index d8ecdfe0..bfeaa27b 100644 --- a/tests/test_failure_info.py +++ b/tests/test_failure_info.py @@ -9,8 +9,8 @@ from polars.testing import assert_frame_equal import dataframely as dy -from dataframely._storage.parquet import SCHEMA_METADATA_KEY -from dataframely.failure import RULE_METADATA_KEY, UNKNOWN_SCHEMA_NAME, FailureInfo +from dataframely._storage.parquet import RULE_METADATA_KEY, SCHEMA_METADATA_KEY +from dataframely.failure import UNKNOWN_SCHEMA_NAME, FailureInfo class MySchema(dy.Schema): From 32b6098e490b7f87182e49d0a56dbf44cf2f4bb2 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Thu, 28 Aug 2025 16:32:52 +0200 Subject: [PATCH 21/25] failure --- dataframely/_storage/parquet.py | 32 +++++++++++++++++++----- dataframely/collection.py | 2 ++ dataframely/failure.py | 43 ++++++++++++--------------------- dataframely/schema.py | 3 ++- 4 files changed, 46 insertions(+), 34 deletions(-) diff --git a/dataframely/_storage/parquet.py b/dataframely/_storage/parquet.py index 91029c9f..fd36dc5b 100644 --- a/dataframely/_storage/parquet.py +++ b/dataframely/_storage/parquet.py @@ -175,11 +175,12 @@ def sink_failure_info( serialized_schema: SerializedSchema, **kwargs: Any, ) -> None: - file = kwargs.pop("file") - metadata = kwargs.pop("metadata", {}) - metadata[RULE_METADATA_KEY] = serialized_rules - metadata[SCHEMA_METADATA_KEY] = serialized_schema - lf.sink_parquet(file, metadata=metadata, **kwargs) + self._write_failure_info( + df=lf, + serialized_rules=serialized_rules, + serialized_schema=serialized_schema, + **kwargs, + ) def write_failure_info( self, @@ -187,12 +188,31 @@ def write_failure_info( serialized_rules: SerializedRules, serialized_schema: SerializedSchema, **kwargs: Any, + ) -> None: + self._write_failure_info( + df=df, + serialized_rules=serialized_rules, + serialized_schema=serialized_schema, + **kwargs, + ) + + def _write_failure_info( + self, + df: pl.DataFrame | pl.LazyFrame, + serialized_rules: SerializedRules, + serialized_schema: SerializedSchema, + **kwargs: Any, ) -> None: file = kwargs.pop("file") metadata = kwargs.pop("metadata", {}) + metadata[RULE_METADATA_KEY] = serialized_rules metadata[SCHEMA_METADATA_KEY] = serialized_schema - df.write_parquet(file, metadata=metadata, **kwargs) + + if isinstance(df, pl.DataFrame): + df.write_parquet(file, metadata=metadata, **kwargs) + else: + df.sink_parquet(file, metadata=metadata, **kwargs) def scan_failure_info( self, **kwargs: Any diff --git a/dataframely/collection.py b/dataframely/collection.py index 45e0526f..287b1876 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -893,6 +893,8 @@ def scan_parquet( **kwargs, ) + # -------------------------------- Storage --------------------------------------- # + def _write(self, io: StorageBackend, directory: Path | str, **kwargs: Any) -> None: # Utility method encapsulating the interaction with the StorageBackend diff --git a/dataframely/failure.py b/dataframely/failure.py index 19f4d7b6..7c894bde 100644 --- a/dataframely/failure.py +++ b/dataframely/failure.py @@ -140,7 +140,7 @@ def read_parquet( Be aware that this method suffers from the same limitations as :meth:`Schema.serialize` """ - return cls._read(io=ParquetStorageBackend(), file=source, **kwargs) + return cls._read(io=ParquetStorageBackend(), file=source, lazy=False, **kwargs) @classmethod def scan_parquet( @@ -161,7 +161,9 @@ def scan_parquet( Be aware that this method suffers from the same limitations as :meth:`Schema.serialize` """ - return cls._scan(io=ParquetStorageBackend(), file=source, **kwargs) + return cls._read(io=ParquetStorageBackend(), file=source, lazy=True, **kwargs) + + # -------------------------------- Storage --------------------------------------- # def _sink( self, @@ -191,44 +193,31 @@ def _write( **kwargs, ) - @classmethod - def _scan( - cls, - io: StorageBackend, - file: str | Path | IO[bytes] | PartitioningScheme, - **kwargs: Any, - ) -> FailureInfo[Schema]: - from .schema import Schema, deserialize_schema - - lf, serialized_rules, serialized_schema = io.scan_failure_info( - file=file, **kwargs - ) - schema = deserialize_schema(serialized_schema, strict=False) or type( - UNKNOWN_SCHEMA_NAME, (Schema,), {} - ) - return FailureInfo( - lf, - json.loads(serialized_rules), - schema=schema, - ) - @classmethod def _read( cls, io: StorageBackend, file: str | Path | IO[bytes] | PartitioningScheme, + lazy: bool, **kwargs: Any, ) -> FailureInfo[Schema]: from .schema import Schema, deserialize_schema - df, serialized_rules, serialized_schema = io.scan_failure_info( - file=file, **kwargs - ) + if lazy: + lf, serialized_rules, serialized_schema = io.scan_failure_info( + file=file, **kwargs + ) + else: + df, serialized_rules, serialized_schema = io.read_failure_info( + file=file, **kwargs + ) + lf = df.lazy() + schema = deserialize_schema(serialized_schema, strict=False) or type( UNKNOWN_SCHEMA_NAME, (Schema,), {} ) return FailureInfo( - df, + lf, json.loads(serialized_rules), schema=schema, ) diff --git a/dataframely/schema.py b/dataframely/schema.py index ac16dfd4..643942c1 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -878,7 +878,8 @@ def _requires_validation_for_reading_parquet( ) return True - # ------------------------------------- IO --------------------------------------- # + # --------------------------------- Storage -------------------------------------- # + @classmethod def _write(cls, df: pl.DataFrame, io: StorageBackend, **kwargs: Any) -> None: io.write_frame(df=df, serialized_schema=cls.serialize(), **kwargs) From 67c6fad37c17f54a00a97ca62434ab3760cc4cf4 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Thu, 28 Aug 2025 16:37:47 +0200 Subject: [PATCH 22/25] backedn --- dataframely/collection.py | 22 +++++++++++++--------- dataframely/failure.py | 22 +++++++++++++--------- dataframely/schema.py | 22 +++++++++++----------- 3 files changed, 37 insertions(+), 29 deletions(-) diff --git a/dataframely/collection.py b/dataframely/collection.py index 287b1876..ec19839a 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -820,7 +820,7 @@ def read_parquet( :meth:`serialize`. """ return cls._read( - io=ParquetStorageBackend(), + backend=ParquetStorageBackend(), validation=validation, directory=directory, lazy=False, @@ -886,7 +886,7 @@ def scan_parquet( :meth:`serialize`. """ return cls._read( - io=ParquetStorageBackend(), + backend=ParquetStorageBackend(), validation=validation, directory=directory, lazy=True, @@ -895,10 +895,12 @@ def scan_parquet( # -------------------------------- Storage --------------------------------------- # - def _write(self, io: StorageBackend, directory: Path | str, **kwargs: Any) -> None: + def _write( + self, backend: StorageBackend, directory: Path | str, **kwargs: Any + ) -> None: # Utility method encapsulating the interaction with the StorageBackend - io.write_collection( + backend.write_collection( self.to_dict(), serialized_collection=self.serialize(), serialized_schemas={ @@ -908,10 +910,12 @@ def _write(self, io: StorageBackend, directory: Path | str, **kwargs: Any) -> No **kwargs, ) - def _sink(self, io: StorageBackend, directory: Path | str, **kwargs: Any) -> None: + def _sink( + self, backend: StorageBackend, directory: Path | str, **kwargs: Any + ) -> None: # Utility method encapsulating the interaction with the StorageBackend - io.sink_collection( + backend.sink_collection( self.to_dict(), serialized_collection=self.serialize(), serialized_schemas={ @@ -923,16 +927,16 @@ def _sink(self, io: StorageBackend, directory: Path | str, **kwargs: Any) -> Non @classmethod def _read( - cls, io: StorageBackend, validation: Validation, lazy: bool, **kwargs: Any + cls, backend: StorageBackend, validation: Validation, lazy: bool, **kwargs: Any ) -> Self: # Utility method encapsulating the interaction with the StorageBackend if lazy: - data, serialized_collection_types = io.scan_collection( + data, serialized_collection_types = backend.scan_collection( members=cls.member_schemas().keys(), **kwargs ) else: - data, serialized_collection_types = io.read_collection( + data, serialized_collection_types = backend.read_collection( members=cls.member_schemas().keys(), **kwargs ) diff --git a/dataframely/failure.py b/dataframely/failure.py index 7c894bde..df9ef5c8 100644 --- a/dataframely/failure.py +++ b/dataframely/failure.py @@ -140,7 +140,9 @@ def read_parquet( Be aware that this method suffers from the same limitations as :meth:`Schema.serialize` """ - return cls._read(io=ParquetStorageBackend(), file=source, lazy=False, **kwargs) + return cls._read( + backend=ParquetStorageBackend(), file=source, lazy=False, **kwargs + ) @classmethod def scan_parquet( @@ -161,17 +163,19 @@ def scan_parquet( Be aware that this method suffers from the same limitations as :meth:`Schema.serialize` """ - return cls._read(io=ParquetStorageBackend(), file=source, lazy=True, **kwargs) + return cls._read( + backend=ParquetStorageBackend(), file=source, lazy=True, **kwargs + ) # -------------------------------- Storage --------------------------------------- # def _sink( self, - io: StorageBackend, + backend: StorageBackend, file: str | Path | IO[bytes] | PartitioningScheme, **kwargs: Any, ) -> None: - io.sink_failure_info( + backend.sink_failure_info( lf=self._lf, serialized_rules=json.dumps(self._rule_columns), serialized_schema=self.schema.serialize(), @@ -181,11 +185,11 @@ def _sink( def _write( self, - io: StorageBackend, + backend: StorageBackend, file: str | Path | IO[bytes] | PartitioningScheme, **kwargs: Any, ) -> None: - io.write_failure_info( + backend.write_failure_info( df=self._df, serialized_rules=json.dumps(self._rule_columns), serialized_schema=self.schema.serialize(), @@ -196,7 +200,7 @@ def _write( @classmethod def _read( cls, - io: StorageBackend, + backend: StorageBackend, file: str | Path | IO[bytes] | PartitioningScheme, lazy: bool, **kwargs: Any, @@ -204,11 +208,11 @@ def _read( from .schema import Schema, deserialize_schema if lazy: - lf, serialized_rules, serialized_schema = io.scan_failure_info( + lf, serialized_rules, serialized_schema = backend.scan_failure_info( file=file, **kwargs ) else: - df, serialized_rules, serialized_schema = io.read_failure_info( + df, serialized_rules, serialized_schema = backend.read_failure_info( file=file, **kwargs ) lf = df.lazy() diff --git a/dataframely/schema.py b/dataframely/schema.py index 643942c1..6a07e1fc 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -698,7 +698,7 @@ def write_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - cls._write(df=df, io=ParquetStorageBackend(), file=file, **kwargs) + cls._write(df=df, backend=ParquetStorageBackend(), file=file, **kwargs) @classmethod def sink_parquet( @@ -726,7 +726,7 @@ def sink_parquet( Be aware that this method suffers from the same limitations as :meth:`serialize`. """ - cls._sink(lf=lf, io=ParquetStorageBackend(), file=file, **kwargs) + cls._sink(lf=lf, backend=ParquetStorageBackend(), file=file, **kwargs) @classmethod def read_parquet( @@ -881,18 +881,18 @@ def _requires_validation_for_reading_parquet( # --------------------------------- Storage -------------------------------------- # @classmethod - def _write(cls, df: pl.DataFrame, io: StorageBackend, **kwargs: Any) -> None: - io.write_frame(df=df, serialized_schema=cls.serialize(), **kwargs) + def _write(cls, df: pl.DataFrame, backend: StorageBackend, **kwargs: Any) -> None: + backend.write_frame(df=df, serialized_schema=cls.serialize(), **kwargs) @classmethod - def _sink(cls, lf: pl.LazyFrame, io: StorageBackend, **kwargs: Any) -> None: - io.sink_frame(lf=lf, serialized_schema=cls.serialize(), **kwargs) + def _sink(cls, lf: pl.LazyFrame, backend: StorageBackend, **kwargs: Any) -> None: + backend.sink_frame(lf=lf, serialized_schema=cls.serialize(), **kwargs) @overload @classmethod def _read( cls, - io: StorageBackend, + backend: StorageBackend, validation: Validation, lazy: Literal[True], **kwargs: Any, @@ -902,7 +902,7 @@ def _read( @classmethod def _read( cls, - io: StorageBackend, + backend: StorageBackend, validation: Validation, lazy: Literal[False], **kwargs: Any, @@ -910,15 +910,15 @@ def _read( @classmethod def _read( - cls, io: StorageBackend, validation: Validation, lazy: bool, **kwargs: Any + cls, backend: StorageBackend, validation: Validation, lazy: bool, **kwargs: Any ) -> LazyFrame[Self] | DataFrame[Self]: source = kwargs.pop("source") # Load if lazy: - lf, serialized_schema = io.scan_frame(source=source) + lf, serialized_schema = backend.scan_frame(source=source) else: - df, serialized_schema = io.read_frame(source=source) + df, serialized_schema = backend.read_frame(source=source) lf = df.lazy() deserialized_schema = ( From 7f79f4b12bb70b45991f5dac671bf244ab03ea0e Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Thu, 28 Aug 2025 16:39:30 +0200 Subject: [PATCH 23/25] doc --- dataframely/failure.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dataframely/failure.py b/dataframely/failure.py index df9ef5c8..a710c2ad 100644 --- a/dataframely/failure.py +++ b/dataframely/failure.py @@ -175,6 +175,8 @@ def _sink( file: str | Path | IO[bytes] | PartitioningScheme, **kwargs: Any, ) -> None: + # Utility method encapsulating the interaction with the StorageBackend + backend.sink_failure_info( lf=self._lf, serialized_rules=json.dumps(self._rule_columns), @@ -189,6 +191,8 @@ def _write( file: str | Path | IO[bytes] | PartitioningScheme, **kwargs: Any, ) -> None: + # Utility method encapsulating the interaction with the StorageBackend + backend.write_failure_info( df=self._df, serialized_rules=json.dumps(self._rule_columns), @@ -205,6 +209,8 @@ def _read( lazy: bool, **kwargs: Any, ) -> FailureInfo[Schema]: + # Utility method encapsulating the interaction with the StorageBackend + from .schema import Schema, deserialize_schema if lazy: From 12078e8435bc5bb155437610a4059725f1b6ccef Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Thu, 28 Aug 2025 16:41:09 +0200 Subject: [PATCH 24/25] fix --- dataframely/_storage/base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dataframely/_storage/base.py b/dataframely/_storage/base.py index feaf7e66..b6ec6667 100644 --- a/dataframely/_storage/base.py +++ b/dataframely/_storage/base.py @@ -163,8 +163,8 @@ def sink_failure_info( Args: lf: LazyFrame backing the failure info. - serialized_rules: String-serialized information about the rules the - failing cases violated. + serialized_rules: JSON-serialized list of rule column names + used for validation. serialized_schema: String-serialized schema information. """ @@ -180,8 +180,8 @@ def write_failure_info( Args: df: DataFrame backing the failure info. - serialized_rules: String-serialized information about the rules the - failing cases violated. + serialized_rules: JSON-serialized list of rule column names + used for validation. serialized_schema: String-serialized schema information. """ From f68fbfcb9460bcdf0bb6b762b41d54d7c4c12079 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 29 Aug 2025 09:12:13 +0200 Subject: [PATCH 25/25] _base --- dataframely/_storage/__init__.py | 6 ++++++ dataframely/_storage/{base.py => _base.py} | 0 dataframely/_storage/parquet.py | 2 +- dataframely/collection.py | 2 +- dataframely/failure.py | 2 +- dataframely/schema.py | 2 +- 6 files changed, 10 insertions(+), 4 deletions(-) rename dataframely/_storage/{base.py => _base.py} (100%) diff --git a/dataframely/_storage/__init__.py b/dataframely/_storage/__init__.py index e047415f..f090c20e 100644 --- a/dataframely/_storage/__init__.py +++ b/dataframely/_storage/__init__.py @@ -1,2 +1,8 @@ # Copyright (c) QuantCo 2025-2025 # SPDX-License-Identifier: BSD-3-Clause + +from ._base import StorageBackend + +__all__ = [ + "StorageBackend", +] diff --git a/dataframely/_storage/base.py b/dataframely/_storage/_base.py similarity index 100% rename from dataframely/_storage/base.py rename to dataframely/_storage/_base.py diff --git a/dataframely/_storage/parquet.py b/dataframely/_storage/parquet.py index fd36dc5b..c7dd9be8 100644 --- a/dataframely/_storage/parquet.py +++ b/dataframely/_storage/parquet.py @@ -7,7 +7,7 @@ import polars as pl -from .base import ( +from ._base import ( SerializedCollection, SerializedRules, SerializedSchema, diff --git a/dataframely/collection.py b/dataframely/collection.py index ec19839a..5b3b367e 100644 --- a/dataframely/collection.py +++ b/dataframely/collection.py @@ -24,7 +24,7 @@ SchemaJSONEncoder, serialization_versions, ) -from ._storage.base import StorageBackend +from ._storage import StorageBackend from ._storage.parquet import COLLECTION_METADATA_KEY, ParquetStorageBackend from ._typing import LazyFrame, Validation from .exc import ( diff --git a/dataframely/failure.py b/dataframely/failure.py index a710c2ad..5198f470 100644 --- a/dataframely/failure.py +++ b/dataframely/failure.py @@ -13,7 +13,7 @@ from dataframely._base_schema import BaseSchema -from ._storage.base import StorageBackend +from ._storage import StorageBackend from ._storage.parquet import ParquetStorageBackend if TYPE_CHECKING: # pragma: no cover diff --git a/dataframely/schema.py b/dataframely/schema.py index 6a07e1fc..42c023f8 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -26,7 +26,7 @@ SchemaJSONEncoder, serialization_versions, ) -from ._storage.base import StorageBackend +from ._storage import StorageBackend from ._storage.parquet import SCHEMA_METADATA_KEY, ParquetStorageBackend from ._typing import DataFrame, LazyFrame, Validation from ._validation import DtypeCasting, validate_columns, validate_dtypes