From 6d930587994e4500d7d12620f4f992bf0cc3f32f Mon Sep 17 00:00:00 2001 From: Christian Bourjau Date: Mon, 23 Oct 2023 20:01:48 +0200 Subject: [PATCH 1/4] Add example of a sklearn like pipeline --- .../examples/03_sklearn_like_pipeline.py | 127 ++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 spec/API_specification/examples/03_sklearn_like_pipeline.py diff --git a/spec/API_specification/examples/03_sklearn_like_pipeline.py b/spec/API_specification/examples/03_sklearn_like_pipeline.py new file mode 100644 index 00000000..46f570ff --- /dev/null +++ b/spec/API_specification/examples/03_sklearn_like_pipeline.py @@ -0,0 +1,127 @@ +""" +This is an example of how a (possibly) lazy data frame may be used +in a sklearn-like pipeline. + +The example is motivated by the prospect of a fully lazy, ONNX-based +data frame implementation. The concept is that calls to `fit` are +eager. They compute some state that is later meant to be transferred +into the ONNX graph/model. That transfer happens when calling +`transform`. The logic within the `transform` methods is "traced" +lazily and the resulting lazy object is then exported to ONNX. +""" +from __future__ import annotations + +from typing import Any, TYPE_CHECKING, Self + +from dataframe_api.dataframe_object import DataFrame + +#: Dummy type alias for a standard compliant array object +Array = Any + + +class Scaler: + """Apply a standardization scaling factor to `column_names`.""" + scalings_: dict[str, float] + + def __init__(self, column_names: list[str]): + self.column_names = column_names + + def fit(self, df: DataFrame) -> Self: + """Compute scaling factors from given data frame. + + Calling this function requires collecting values. + """ + self.scalings_ = {} + for column_name in df.column_names: + if not column_name in self.column_names: + continue + self.scalings_[column_name] = df.get_column_by_name(column_name).std() + + return self + + def transform(self, df: DataFrame) -> DataFrame: + """Apply the "trained" scaling values. + + This function is guaranteed to not collect values. + """ + df = copy_df(df) + for column_name in df.column_names: + if not column_name in self.column_names: + continue + column = df.get_column_by_name(column_name) / self.scalings_[column_name] + df.assign(column) + + return df + +class FeatureSelector: + """Limit columns to those seen in training including their order.""" + + def fit(self, df: DataFrame) -> Self: + """Record the observed columns and their order. + + This function is guaranteed to not collect values. + """ + self.columns_ = df.column_names + return self + + def transform(self, df: DataFrame) -> DataFrame: + """Select and sort the columns as observed in training. + + This function is guaranteed to not collect values. + """ + # FIXME: Does this ensure column order? + return df.select(self.columns_) + + +class Pipeline: + """Linear pipeline of transformers.""" + + def __init__(self, steps: list[Any]): + self.steps = steps + + def fit(self, df: DataFrame) -> Self: + """Call filt on the steps of the pipeline subsequently. + + Calling this function may trigger a collection. + """ + for step in self.steps: + step.fit(df) + + self.steps_ = self.steps + return self + + def transform(self, df: DataFrame) -> DataFrame: + """Call transform on all steps of this pipeline subsequently. + + This function is guaranteed to not trigger a collection. + """ + for step in self.steps_: + df = step.transform(df) + + return df + + +def copy_df(df: DataFrame): + """Create a copy of `df`. + + This is done by converting a data frame into standard-arrays and + assembling them back into a new data frame. + """ + dfx = df.__dataframe_namespace__() + + dct = dataframe_to_dict_of_arrays(df) + return dfx.dataframe_from_dict( + # FIXME: This would require some kind of dtype mapping? + {column_name: dfx.column_from_1d_array(arr, dtype=arr.dtype) for column_name, arr in dct.items()} + ) + + +def dataframe_to_dict_of_arrays(df: DataFrame) -> dict[str, Array]: + """Convert the given data frame into an dictionary of standard arrays. """ + dct = {} + dfx = df.__dataframe_namespace__() + for column_name in df.column_names: + column = df.get_column_by_name(column_name) + dct[column_name] = column.to_array_object(column.dtype) + + return dct From 42d454a7d14ee30371771ef0ca21493c667740ec Mon Sep 17 00:00:00 2001 From: Christian Bourjau Date: Tue, 24 Oct 2023 10:19:37 +0200 Subject: [PATCH 2/4] Review comments --- .../examples/03_sklearn_like_pipeline.py | 51 +++++-------------- 1 file changed, 14 insertions(+), 37 deletions(-) diff --git a/spec/API_specification/examples/03_sklearn_like_pipeline.py b/spec/API_specification/examples/03_sklearn_like_pipeline.py index 46f570ff..9eca579a 100644 --- a/spec/API_specification/examples/03_sklearn_like_pipeline.py +++ b/spec/API_specification/examples/03_sklearn_like_pipeline.py @@ -15,9 +15,6 @@ from dataframe_api.dataframe_object import DataFrame -#: Dummy type alias for a standard compliant array object -Array = Any - class Scaler: """Apply a standardization scaling factor to `column_names`.""" @@ -31,11 +28,16 @@ def fit(self, df: DataFrame) -> Self: Calling this function requires collecting values. """ - self.scalings_ = {} - for column_name in df.column_names: - if not column_name in self.column_names: - continue - self.scalings_[column_name] = df.get_column_by_name(column_name).std() + scalings = df.select(self.column_names).std() + if hasattr(scalings, 'collect'): + scalings = scalings.collect() + + self.scalings_ = { + # Clarify: Should the return type of `get_value` (lazy / + # eager scalar) depend on a previous call to `collect`? + column_name: scalings.get_column_by_name(column_name).get_value(0) + for column_name in self.column_names + } return self @@ -44,12 +46,12 @@ def transform(self, df: DataFrame) -> DataFrame: This function is guaranteed to not collect values. """ - df = copy_df(df) for column_name in df.column_names: if not column_name in self.column_names: continue column = df.get_column_by_name(column_name) / self.scalings_[column_name] - df.assign(column) + # Note: `assign` is not in-place + df = df.assign(column) return df @@ -69,7 +71,7 @@ def transform(self, df: DataFrame) -> DataFrame: This function is guaranteed to not collect values. """ - # FIXME: Does this ensure column order? + # Note: This assumes that select ensures the column order. return df.select(self.columns_) @@ -80,7 +82,7 @@ def __init__(self, steps: list[Any]): self.steps = steps def fit(self, df: DataFrame) -> Self: - """Call filt on the steps of the pipeline subsequently. + """Call fit on the steps of the pipeline subsequently. Calling this function may trigger a collection. """ @@ -100,28 +102,3 @@ def transform(self, df: DataFrame) -> DataFrame: return df - -def copy_df(df: DataFrame): - """Create a copy of `df`. - - This is done by converting a data frame into standard-arrays and - assembling them back into a new data frame. - """ - dfx = df.__dataframe_namespace__() - - dct = dataframe_to_dict_of_arrays(df) - return dfx.dataframe_from_dict( - # FIXME: This would require some kind of dtype mapping? - {column_name: dfx.column_from_1d_array(arr, dtype=arr.dtype) for column_name, arr in dct.items()} - ) - - -def dataframe_to_dict_of_arrays(df: DataFrame) -> dict[str, Array]: - """Convert the given data frame into an dictionary of standard arrays. """ - dct = {} - dfx = df.__dataframe_namespace__() - for column_name in df.column_names: - column = df.get_column_by_name(column_name) - dct[column_name] = column.to_array_object(column.dtype) - - return dct From 33c39c833064d2b2169ee682877f2512ce471418 Mon Sep 17 00:00:00 2001 From: Christian Bourjau Date: Wed, 25 Oct 2023 11:45:09 +0200 Subject: [PATCH 3/4] Update comment after discussion in #279 --- spec/API_specification/examples/03_sklearn_like_pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/API_specification/examples/03_sklearn_like_pipeline.py b/spec/API_specification/examples/03_sklearn_like_pipeline.py index 9eca579a..dd1d049c 100644 --- a/spec/API_specification/examples/03_sklearn_like_pipeline.py +++ b/spec/API_specification/examples/03_sklearn_like_pipeline.py @@ -33,8 +33,8 @@ def fit(self, df: DataFrame) -> Self: scalings = scalings.collect() self.scalings_ = { - # Clarify: Should the return type of `get_value` (lazy / - # eager scalar) depend on a previous call to `collect`? + # Note: `get_value` returns an implemenation-defined, + # duck-typed scalar which may be lazy. column_name: scalings.get_column_by_name(column_name).get_value(0) for column_name in self.column_names } From c87ebf4baef5a15754a600d2a7f0062809db6954 Mon Sep 17 00:00:00 2001 From: MarcoGorelli <33491632+MarcoGorelli@users.noreply.github.com> Date: Mon, 4 Dec 2023 16:05:45 +0000 Subject: [PATCH 4/4] update --- ...ipeline.py => 07_sklearn_like_pipeline.py} | 50 ++++++++++--------- 1 file changed, 27 insertions(+), 23 deletions(-) rename spec/API_specification/examples/{03_sklearn_like_pipeline.py => 07_sklearn_like_pipeline.py} (71%) diff --git a/spec/API_specification/examples/03_sklearn_like_pipeline.py b/spec/API_specification/examples/07_sklearn_like_pipeline.py similarity index 71% rename from spec/API_specification/examples/03_sklearn_like_pipeline.py rename to spec/API_specification/examples/07_sklearn_like_pipeline.py index dd1d049c..6dc60c74 100644 --- a/spec/API_specification/examples/03_sklearn_like_pipeline.py +++ b/spec/API_specification/examples/07_sklearn_like_pipeline.py @@ -1,4 +1,5 @@ -""" +"""Scikit-learn-like pipeline. + This is an example of how a (possibly) lazy data frame may be used in a sklearn-like pipeline. @@ -8,34 +9,36 @@ into the ONNX graph/model. That transfer happens when calling `transform`. The logic within the `transform` methods is "traced" lazily and the resulting lazy object is then exported to ONNX. -""" +""" from __future__ import annotations -from typing import Any, TYPE_CHECKING, Self +from typing import TYPE_CHECKING, Any -from dataframe_api.dataframe_object import DataFrame +if TYPE_CHECKING: + from typing_extensions import Self + + from dataframe_api.typing import Column, DataFrame, Scalar class Scaler: - """Apply a standardization scaling factor to `column_names`.""" - scalings_: dict[str, float] + """Apply a standardization scaling factor to `column_names`.""" + + scalings_: dict[str, Scalar] - def __init__(self, column_names: list[str]): + def __init__(self, column_names: list[str]) -> None: self.column_names = column_names def fit(self, df: DataFrame) -> Self: """Compute scaling factors from given data frame. - Calling this function requires collecting values. + A typical data science workflow is to fit on one dataset, + and then transform and multiple datasets. Therefore, we + make sure to `persist` within the `fit` method. """ - scalings = df.select(self.column_names).std() - if hasattr(scalings, 'collect'): - scalings = scalings.collect() + scalings = df.select(*self.column_names).std().persist() self.scalings_ = { - # Note: `get_value` returns an implemenation-defined, - # duck-typed scalar which may be lazy. - column_name: scalings.get_column_by_name(column_name).get_value(0) + column_name: scalings.col(column_name).get_value(0) for column_name in self.column_names } @@ -46,14 +49,16 @@ def transform(self, df: DataFrame) -> DataFrame: This function is guaranteed to not collect values. """ + columns: list[Column] = [] for column_name in df.column_names: - if not column_name in self.column_names: + if column_name not in self.column_names: continue - column = df.get_column_by_name(column_name) / self.scalings_[column_name] - # Note: `assign` is not in-place - df = df.assign(column) + column = df.col(column_name) / self.scalings_[column_name] + columns.append(column) + + # Note: `assign` is not in-place + return df.assign(*columns) - return df class FeatureSelector: """Limit columns to those seen in training including their order.""" @@ -72,15 +77,15 @@ def transform(self, df: DataFrame) -> DataFrame: This function is guaranteed to not collect values. """ # Note: This assumes that select ensures the column order. - return df.select(self.columns_) + return df.select(*self.columns_) class Pipeline: """Linear pipeline of transformers.""" - def __init__(self, steps: list[Any]): + def __init__(self, steps: list[Any]) -> None: self.steps = steps - + def fit(self, df: DataFrame) -> Self: """Call fit on the steps of the pipeline subsequently. @@ -101,4 +106,3 @@ def transform(self, df: DataFrame) -> DataFrame: df = step.transform(df) return df -