From 1754db0eaaa15b6aeb91890df1b98bd6fc5cda0c Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 14 May 2025 00:01:32 +0000 Subject: [PATCH 1/4] feat: Use read api for some peek ops --- bigframes/session/read_api_execution.py | 26 +++++++++++++++++++------ tests/system/large/test_dataframe_io.py | 14 +++++++++++++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index 46d55eb303..4b79c7ab71 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -18,7 +18,7 @@ from google.cloud import bigquery_storage_v1 import pyarrow as pa -from bigframes.core import bigframe_node, rewrite +from bigframes.core import bigframe_node, nodes, pyarrow_utils, rewrite from bigframes.session import executor, semi_executor @@ -39,14 +39,11 @@ def execute( ordered: bool, peek: Optional[int] = None, ) -> Optional[executor.ExecuteResult]: - node = rewrite.try_reduce_to_table_scan(plan) + node = self._try_adapt_plan(plan, ordered) if not node: return None if node.explicitly_ordered and ordered: return None - if peek: - # TODO: Support peeking - return None import google.cloud.bigquery_storage_v1.types as bq_storage_types from google.protobuf import timestamp_pb2 @@ -98,10 +95,27 @@ def process_page(page): batches = map(process_page, rowstream.pages) + if peek: + batches = pyarrow_utils.truncate_pyarrow_iterable(batches, max_results=peek) + + rows = node.source.n_rows + if peek and rows: + rows = min(peek, rows) + return executor.ExecuteResult( arrow_batches=batches, schema=plan.schema, query_job=None, total_bytes=None, - total_rows=node.source.n_rows, + total_rows=rows, ) + + def _try_adapt_plan( + self, + plan: bigframe_node.BigFrameNode, + ordered: bool, + ) -> Optional[nodes.ReadTableNode]: + if not ordered: + # gets rid of order_by ops + plan = rewrite.bake_order(plan) + return rewrite.try_reduce_to_table_scan(plan) diff --git a/tests/system/large/test_dataframe_io.py b/tests/system/large/test_dataframe_io.py index b10e361129..87d2acd34b 100644 --- a/tests/system/large/test_dataframe_io.py +++ b/tests/system/large/test_dataframe_io.py @@ -29,6 +29,20 @@ def test_to_pandas_batches_raise_when_large_result_not_allowed(session): next(df.to_pandas_batches(page_size=500, max_results=1500)) +def test_large_df_peek_no_job(session): + execution_count_before = session._metrics.execution_count + + # only works with null index, as sequential index requires row_number over full table scan. + df = session.read_gbq( + WIKIPEDIA_TABLE, index_col=bigframes.enums.DefaultIndexKind.NULL + ) + result = df.peek(50) + execution_count_after = session._metrics.execution_count + + assert len(result) == 50 + assert execution_count_after == execution_count_before + + def test_to_pandas_batches_override_global_option( session, ): From 5ca45c01a2f2391d17e5ad2ba186f781190d0567 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 14 May 2025 01:11:46 +0000 Subject: [PATCH 2/4] fix read api column order bug --- bigframes/session/read_api_execution.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index 4b79c7ab71..1f4f159910 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -89,9 +89,9 @@ def execute( def process_page(page): pa_batch = page.to_arrow() - return pa.RecordBatch.from_arrays( - pa_batch.columns, names=[id.sql for id in node.ids] - ) + return pa_batch.select( + [item.source_id for item in node.scan_list.items] + ).rename_columns([id.sql for id in node.ids]) batches = map(process_page, rowstream.pages) From 5af3dff0c65e2cca06ad13372fb6f0c0221f96ae Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 14 May 2025 21:55:43 +0000 Subject: [PATCH 3/4] make compatible with old pyarrow --- bigframes/session/read_api_execution.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index 1f4f159910..d5d7c3fb6a 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -89,9 +89,12 @@ def execute( def process_page(page): pa_batch = page.to_arrow() - return pa_batch.select( + pa_batch = pa_batch.select( [item.source_id for item in node.scan_list.items] - ).rename_columns([id.sql for id in node.ids]) + ) + return pa.RecordBatch.from_arrays( + pa_batch.columns, names=[id.sql for id in node.ids] + ) batches = map(process_page, rowstream.pages) From 190ae0eb4ee7ea1b9471202257528d6255d29704 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 15 May 2025 19:45:34 +0000 Subject: [PATCH 4/4] add _try_adapt_plan docstring --- bigframes/session/read_api_execution.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index d5d7c3fb6a..9384a40fbe 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -118,6 +118,9 @@ def _try_adapt_plan( plan: bigframe_node.BigFrameNode, ordered: bool, ) -> Optional[nodes.ReadTableNode]: + """ + Tries to simplify the plan to an equivalent single ReadTableNode. Otherwise, returns None. + """ if not ordered: # gets rid of order_by ops plan = rewrite.bake_order(plan)