diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 178d698f8d..ca410a9eb4 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -406,11 +406,16 @@ def reorder_levels(self, ids: typing.Sequence[str]): level_names = [self.col_id_to_index_name[index_id] for index_id in ids] return Block(self.expr, ids, self.column_labels, level_names) - def _to_dataframe(self, result) -> pd.DataFrame: + def _to_dataframe( + self, result, dtype_backend: Literal["default", "pyarrow"] = "default" + ) -> pd.DataFrame: """Convert BigQuery data to pandas DataFrame with specific dtypes.""" + dtypes = dict(zip(self.index_columns, self.index.dtypes)) dtypes.update(zip(self.value_columns, self.dtypes)) - return self.session._rows_to_dataframe(result, dtypes) + return self.session._rows_to_dataframe( + result, dtypes, dtype_backend=dtype_backend + ) def to_pandas( self, @@ -419,6 +424,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + dtype_backend: Literal["default", "pyarrow"] = "default", ) -> Tuple[pd.DataFrame, bigquery.QueryJob]: """Run query and download results as a pandas DataFrame.""" if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS): @@ -438,7 +444,8 @@ def to_pandas( df, query_job = self._materialize_local( materialize_options=MaterializationOptions( downsampling=sampling, ordered=ordered - ) + ), + dtype_backend=dtype_backend, ) df.set_axis(self.column_labels, axis=1, copy=False) return df, query_job @@ -478,7 +485,9 @@ def _copy_index_to_pandas(self, df: pd.DataFrame): df.columns = self.column_labels def _materialize_local( - self, materialize_options: MaterializationOptions = MaterializationOptions() + self, + materialize_options: MaterializationOptions = MaterializationOptions(), + dtype_backend: Literal["default", "pyarrow"] = "default", ) -> Tuple[pd.DataFrame, bigquery.QueryJob]: """Run query and download results as a pandas DataFrame. Return the total number of results as well.""" # TODO(swast): Allow for dry run and timeout. @@ -530,7 +539,10 @@ def _materialize_local( ) else: total_rows = results_iterator.total_rows - df = self._to_dataframe(results_iterator) + df = self._to_dataframe( + results_iterator, + dtype_backend=dtype_backend, + ) self._copy_index_to_pandas(df) return df, query_job diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 4e447c547f..0fb77e9569 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1038,6 +1038,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + dtype_backend: Literal["default", "pyarrow"] = "default", ) -> pandas.DataFrame: """Write DataFrame to pandas DataFrame. @@ -1060,6 +1061,11 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas dataframe will be deterministically ordered. In some cases, unordered may result in a faster-executing query. + dtype_backend (str, default "default"): + Controls dtypes returns. Options include: + + * ``"default"``: a mix of dtypes, optimizing correctness and compatibility. + * ``"pyarrow"``: pyarrow-backed ArrowDtype for all columns. Returns: pandas.DataFrame: A pandas DataFrame with all rows and columns of this DataFrame if the @@ -1073,6 +1079,7 @@ def to_pandas( sampling_method=sampling_method, random_state=random_state, ordered=ordered, + dtype_backend=dtype_backend, ) self._set_internal_query_job(query_job) return df.set_axis(self._block.column_labels, axis=1, copy=False) diff --git a/bigframes/series.py b/bigframes/series.py index 5f6cfe9893..97c362acd9 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -295,6 +295,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + dtype_backend: Literal["default", "pyarrow"] = "default", ) -> pandas.Series: """Writes Series to pandas Series. @@ -317,6 +318,11 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas series will be deterministically ordered. In some cases, unordered may result in a faster-executing query. + dtype_backend (str, default "default"): + Controls dtypes returns. Options include: + + * ``"default"``: a mix of dtypes, optimizing correctness and compatibility. + * ``"pyarrow"``: pyarrow-backed ArrowDtype for all columns. Returns: @@ -328,6 +334,7 @@ def to_pandas( sampling_method=sampling_method, random_state=random_state, ordered=ordered, + dtype_backend=dtype_backend, ) self._set_internal_query_job(query_job) series = df.squeeze(axis=1) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 5732d4b08e..559561950a 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1822,10 +1822,24 @@ def _get_table_size(self, destination_table): return table.num_bytes def _rows_to_dataframe( - self, row_iterator: bigquery.table.RowIterator, dtypes: Dict + self, + row_iterator: bigquery.table.RowIterator, + dtypes: Dict, + dtype_backend: Literal["default", "pyarrow"] = "default", ) -> pandas.DataFrame: arrow_table = row_iterator.to_arrow() - return bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes) + + if dtype_backend == "default": + return bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes) + elif dtype_backend == "pyarrow": + return pandas.DataFrame( + { + name: pandas.Series(value, dtype=pandas.ArrowDtype(value.type)) + for name, value in zip(arrow_table.column_names, arrow_table) + } + ) + else: + raise ValueError(f"got unexpected dtype_backend={repr(dtype_backend)}") def _start_generic_job(self, job: formatting_helpers.GenericJob): if bigframes.options.display.progress_bar is not None: diff --git a/bigframes/session/_io/pandas.py b/bigframes/session/_io/pandas.py index 789426a6e3..88e0f6d3f0 100644 --- a/bigframes/session/_io/pandas.py +++ b/bigframes/session/_io/pandas.py @@ -49,7 +49,8 @@ def _arrow_to_pandas_arrowdtype( def arrow_to_pandas( - arrow_table: Union[pyarrow.Table, pyarrow.RecordBatch], dtypes: Dict + arrow_table: Union[pyarrow.Table, pyarrow.RecordBatch], + dtypes: Dict, ): if len(dtypes) != arrow_table.num_columns: raise ValueError( diff --git a/tests/system/conftest.py b/tests/system/conftest.py index e6b241c9a3..f5038a2569 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -424,6 +424,17 @@ def scalars_pandas_df_index( return scalars_pandas_df_default_index.set_index("rowindex").sort_index() +@pytest.fixture(scope="session") +def scalars_pandas_df_index_pyarrow() -> pd.DataFrame: + """pd.DataFrame pointing at test data.""" + + if pd.__version__.startswith("1."): + pytest.skip("dtype_backend='pyarrow' not supported in pandas 1.x") + + df = pd.read_json(DATA_DIR / "scalars.jsonl", lines=True, dtype_backend="pyarrow") + return df.set_index("rowindex").sort_index() + + @pytest.fixture(scope="session") def scalars_pandas_df_multi_index( scalars_pandas_df_default_index: pd.DataFrame,