From 57baa34ee055b6ebd8fb7e82ca8f18428560bbd0 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 20 Mar 2024 22:08:39 +0000 Subject: [PATCH 1/2] feat: add dtype_backend=pyarrow to to_pandas --- bigframes/core/blocks.py | 24 ++++++++++++++++++------ bigframes/dataframe.py | 7 +++++++ bigframes/series.py | 9 ++++++++- bigframes/session/__init__.py | 18 ++++++++++++++++-- bigframes/session/_io/pandas.py | 3 ++- setup.py | 3 +-- tests/system/conftest.py | 11 +++++++++++ tests/system/small/test_groupby.py | 12 +++++++----- 8 files changed, 70 insertions(+), 17 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 0ebbe48cc4..395a3fc58f 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -26,7 +26,7 @@ import itertools import random import typing -from typing import Iterable, List, Mapping, Optional, Sequence, Tuple +from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple, Union import warnings import bigframes_vendored.pandas.io.common as vendored_pandas_io_common @@ -415,11 +415,16 @@ def reorder_levels(self, ids: typing.Sequence[str]): level_names = [self.col_id_to_index_name[index_id] for index_id in ids] return Block(self.expr, ids, self.column_labels, level_names) - def _to_dataframe(self, result) -> pd.DataFrame: + def _to_dataframe( + self, result, dtype_backend: Union[None, Literal["pyarrow"]] = None + ) -> pd.DataFrame: """Convert BigQuery data to pandas DataFrame with specific dtypes.""" + dtypes = dict(zip(self.index_columns, self.index.dtypes)) dtypes.update(zip(self.value_columns, self.dtypes)) - return self.session._rows_to_dataframe(result, dtypes) + return self.session._rows_to_dataframe( + result, dtypes, dtype_backend=dtype_backend + ) def to_pandas( self, @@ -428,6 +433,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + dtype_backend: Union[None, Literal["pyarrow"]] = None, ) -> Tuple[pd.DataFrame, bigquery.QueryJob]: """Run query and download results as a pandas DataFrame.""" if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS): @@ -447,7 +453,8 @@ def to_pandas( df, query_job = self._materialize_local( materialize_options=MaterializationOptions( downsampling=sampling, ordered=ordered - ) + ), + dtype_backend=dtype_backend, ) df.set_axis(self.column_labels, axis=1, copy=False) return df, query_job @@ -486,7 +493,9 @@ def _copy_index_to_pandas(self, df: pd.DataFrame): df.index.names = self.index.names # type: ignore def _materialize_local( - self, materialize_options: MaterializationOptions = MaterializationOptions() + self, + materialize_options: MaterializationOptions = MaterializationOptions(), + dtype_backend: Union[None, Literal["pyarrow"]] = None, ) -> Tuple[pd.DataFrame, bigquery.QueryJob]: """Run query and download results as a pandas DataFrame. Return the total number of results as well.""" # TODO(swast): Allow for dry run and timeout. @@ -538,7 +547,10 @@ def _materialize_local( ) else: total_rows = results_iterator.total_rows - df = self._to_dataframe(results_iterator) + df = self._to_dataframe( + results_iterator, + dtype_backend=dtype_backend, + ) self._copy_index_to_pandas(df) return df, query_job diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 0f99a3e4db..ebd01da1f7 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1032,6 +1032,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + dtype_backend: Union[None, Literal["pyarrow"]] = None, ) -> pandas.DataFrame: """Write DataFrame to pandas DataFrame. @@ -1054,6 +1055,11 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas dataframe will be deterministically ordered. In some cases, unordered may result in a faster-executing query. + dtype_backend (str, default None): + Controls dtypes returns. Options include: + + * ``None``: a mix of dtypes, optimizing correctness and compatibility. + * ``"pyarrow"``: pyarrow-backed ArrowDtype for all columns. Returns: pandas.DataFrame: A pandas DataFrame with all rows and columns of this DataFrame if the @@ -1066,6 +1072,7 @@ def to_pandas( sampling_method=sampling_method, random_state=random_state, ordered=ordered, + dtype_backend=dtype_backend, ) self._set_internal_query_job(query_job) return df.set_axis(self._block.column_labels, axis=1, copy=False) diff --git a/bigframes/series.py b/bigframes/series.py index 6128238057..f46b59bc1b 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -21,7 +21,7 @@ import numbers import textwrap import typing -from typing import Any, Mapping, Optional, Tuple, Union +from typing import Any, Literal, Mapping, Optional, Tuple, Union import bigframes_vendored.pandas.core.series as vendored_pandas_series import google.cloud.bigquery as bigquery @@ -289,6 +289,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + dtype_backend: Union[None, Literal["pyarrow"]] = None, ) -> pandas.Series: """Writes Series to pandas Series. @@ -311,6 +312,11 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas series will be deterministically ordered. In some cases, unordered may result in a faster-executing query. + dtype_backend (str, default None): + Controls dtypes returns. Options include: + + * ``None``: a mix of dtypes, optimizing correctness and compatibility. + * ``"pyarrow"``: pyarrow-backed ArrowDtype for all columns. Returns: @@ -322,6 +328,7 @@ def to_pandas( sampling_method=sampling_method, random_state=random_state, ordered=ordered, + dtype_backend=dtype_backend, ) self._set_internal_query_job(query_job) series = df.squeeze(axis=1) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 4cb3c11859..58578430b8 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1788,10 +1788,24 @@ def _get_table_size(self, destination_table): return table.num_bytes def _rows_to_dataframe( - self, row_iterator: bigquery.table.RowIterator, dtypes: Dict + self, + row_iterator: bigquery.table.RowIterator, + dtypes: Dict, + dtype_backend: Union[None, Literal["pyarrow"]] = None, ) -> pandas.DataFrame: arrow_table = row_iterator.to_arrow() - return bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes) + + if dtype_backend is None: + return bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes) + elif dtype_backend == "pyarrow": + return pandas.DataFrame( + { + name: pandas.Series(value, dtype=pandas.ArrowDtype(value.type)) + for name, value in zip(arrow_table.column_names, arrow_table) + } + ) + else: + raise ValueError(f"got unexpected dtype_backend={repr(dtype_backend)}") def _start_generic_job(self, job: formatting_helpers.GenericJob): if bigframes.options.display.progress_bar is not None: diff --git a/bigframes/session/_io/pandas.py b/bigframes/session/_io/pandas.py index 789426a6e3..88e0f6d3f0 100644 --- a/bigframes/session/_io/pandas.py +++ b/bigframes/session/_io/pandas.py @@ -49,7 +49,8 @@ def _arrow_to_pandas_arrowdtype( def arrow_to_pandas( - arrow_table: Union[pyarrow.Table, pyarrow.RecordBatch], dtypes: Dict + arrow_table: Union[pyarrow.Table, pyarrow.RecordBatch], + dtypes: Dict, ): if len(dtypes) != arrow_table.num_columns: raise ValueError( diff --git a/setup.py b/setup.py index 5258a7d6f9..6fb47147dc 100644 --- a/setup.py +++ b/setup.py @@ -45,8 +45,7 @@ "google-cloud-resource-manager >=1.10.3", "google-cloud-storage >=2.0.0", "ibis-framework[bigquery] >=8.0.0,<9.0.0dev", - # TODO: Relax upper bound once we have fixed `system_prerelease` tests. - "pandas >=1.5.0,<2.1.4", + "pandas >=1.5.0", "pydata-google-auth >=1.8.2", "requests >=2.27.1", "scikit-learn >=1.2.2", diff --git a/tests/system/conftest.py b/tests/system/conftest.py index e6b241c9a3..f5038a2569 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -424,6 +424,17 @@ def scalars_pandas_df_index( return scalars_pandas_df_default_index.set_index("rowindex").sort_index() +@pytest.fixture(scope="session") +def scalars_pandas_df_index_pyarrow() -> pd.DataFrame: + """pd.DataFrame pointing at test data.""" + + if pd.__version__.startswith("1."): + pytest.skip("dtype_backend='pyarrow' not supported in pandas 1.x") + + df = pd.read_json(DATA_DIR / "scalars.jsonl", lines=True, dtype_backend="pyarrow") + return df.set_index("rowindex").sort_index() + + @pytest.fixture(scope="session") def scalars_pandas_df_multi_index( scalars_pandas_df_default_index: pd.DataFrame, diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index b38dcaf5d1..6976277ecc 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -240,12 +240,14 @@ def test_dataframe_groupby_multi_sum( ], ) def test_dataframe_groupby_analytic( - scalars_df_index, scalars_pandas_df_index, operator + scalars_df_index, scalars_pandas_df_index_pyarrow, operator ): - col_names = ["float64_col", "int64_col", "bool_col", "string_col"] - bf_result = operator(scalars_df_index[col_names].groupby("string_col")) - pd_result = operator(scalars_pandas_df_index[col_names].groupby("string_col")) - bf_result_computed = bf_result.to_pandas() + df_bf = scalars_df_index + df_pd = scalars_pandas_df_index_pyarrow + col_names = ["float64_col", "int64_col", "bool_col", "int64_too"] + bf_result = operator(df_bf[col_names].groupby("int64_too")) + pd_result = operator(df_pd[col_names].groupby("int64_too")) + bf_result_computed = bf_result.to_pandas(dtype_backend="pyarrow") pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False) From 65c724c495108320c2e9d185e26104b18c215d1e Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 21 Mar 2024 15:49:58 +0000 Subject: [PATCH 2/2] use default as default --- bigframes/core/blocks.py | 8 ++++---- bigframes/dataframe.py | 6 +++--- bigframes/series.py | 6 +++--- bigframes/session/__init__.py | 4 ++-- setup.py | 3 ++- tests/system/small/test_groupby.py | 12 +++++------- 6 files changed, 19 insertions(+), 20 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 760def4dc8..ca410a9eb4 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -26,7 +26,7 @@ import itertools import random import typing -from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple, Union +from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple import warnings import google.cloud.bigquery as bigquery @@ -407,7 +407,7 @@ def reorder_levels(self, ids: typing.Sequence[str]): return Block(self.expr, ids, self.column_labels, level_names) def _to_dataframe( - self, result, dtype_backend: Union[None, Literal["pyarrow"]] = None + self, result, dtype_backend: Literal["default", "pyarrow"] = "default" ) -> pd.DataFrame: """Convert BigQuery data to pandas DataFrame with specific dtypes.""" @@ -424,7 +424,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, - dtype_backend: Union[None, Literal["pyarrow"]] = None, + dtype_backend: Literal["default", "pyarrow"] = "default", ) -> Tuple[pd.DataFrame, bigquery.QueryJob]: """Run query and download results as a pandas DataFrame.""" if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS): @@ -487,7 +487,7 @@ def _copy_index_to_pandas(self, df: pd.DataFrame): def _materialize_local( self, materialize_options: MaterializationOptions = MaterializationOptions(), - dtype_backend: Union[None, Literal["pyarrow"]] = None, + dtype_backend: Literal["default", "pyarrow"] = "default", ) -> Tuple[pd.DataFrame, bigquery.QueryJob]: """Run query and download results as a pandas DataFrame. Return the total number of results as well.""" # TODO(swast): Allow for dry run and timeout. diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 123a5505b4..0fb77e9569 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1038,7 +1038,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, - dtype_backend: Union[None, Literal["pyarrow"]] = None, + dtype_backend: Literal["default", "pyarrow"] = "default", ) -> pandas.DataFrame: """Write DataFrame to pandas DataFrame. @@ -1061,10 +1061,10 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas dataframe will be deterministically ordered. In some cases, unordered may result in a faster-executing query. - dtype_backend (str, default None): + dtype_backend (str, default "default"): Controls dtypes returns. Options include: - * ``None``: a mix of dtypes, optimizing correctness and compatibility. + * ``"default"``: a mix of dtypes, optimizing correctness and compatibility. * ``"pyarrow"``: pyarrow-backed ArrowDtype for all columns. Returns: diff --git a/bigframes/series.py b/bigframes/series.py index 2c476fcacf..97c362acd9 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -295,7 +295,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, - dtype_backend: Union[None, Literal["pyarrow"]] = None, + dtype_backend: Literal["default", "pyarrow"] = "default", ) -> pandas.Series: """Writes Series to pandas Series. @@ -318,10 +318,10 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas series will be deterministically ordered. In some cases, unordered may result in a faster-executing query. - dtype_backend (str, default None): + dtype_backend (str, default "default"): Controls dtypes returns. Options include: - * ``None``: a mix of dtypes, optimizing correctness and compatibility. + * ``"default"``: a mix of dtypes, optimizing correctness and compatibility. * ``"pyarrow"``: pyarrow-backed ArrowDtype for all columns. diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b13970eb9e..559561950a 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1825,11 +1825,11 @@ def _rows_to_dataframe( self, row_iterator: bigquery.table.RowIterator, dtypes: Dict, - dtype_backend: Union[None, Literal["pyarrow"]] = None, + dtype_backend: Literal["default", "pyarrow"] = "default", ) -> pandas.DataFrame: arrow_table = row_iterator.to_arrow() - if dtype_backend is None: + if dtype_backend == "default": return bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes) elif dtype_backend == "pyarrow": return pandas.DataFrame( diff --git a/setup.py b/setup.py index 6fb47147dc..5258a7d6f9 100644 --- a/setup.py +++ b/setup.py @@ -45,7 +45,8 @@ "google-cloud-resource-manager >=1.10.3", "google-cloud-storage >=2.0.0", "ibis-framework[bigquery] >=8.0.0,<9.0.0dev", - "pandas >=1.5.0", + # TODO: Relax upper bound once we have fixed `system_prerelease` tests. + "pandas >=1.5.0,<2.1.4", "pydata-google-auth >=1.8.2", "requests >=2.27.1", "scikit-learn >=1.2.2", diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index 6976277ecc..b38dcaf5d1 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -240,14 +240,12 @@ def test_dataframe_groupby_multi_sum( ], ) def test_dataframe_groupby_analytic( - scalars_df_index, scalars_pandas_df_index_pyarrow, operator + scalars_df_index, scalars_pandas_df_index, operator ): - df_bf = scalars_df_index - df_pd = scalars_pandas_df_index_pyarrow - col_names = ["float64_col", "int64_col", "bool_col", "int64_too"] - bf_result = operator(df_bf[col_names].groupby("int64_too")) - pd_result = operator(df_pd[col_names].groupby("int64_too")) - bf_result_computed = bf_result.to_pandas(dtype_backend="pyarrow") + col_names = ["float64_col", "int64_col", "bool_col", "string_col"] + bf_result = operator(scalars_df_index[col_names].groupby("string_col")) + pd_result = operator(scalars_pandas_df_index[col_names].groupby("string_col")) + bf_result_computed = bf_result.to_pandas() pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)