diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index 46d55eb303..9384a40fbe 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -18,7 +18,7 @@ from google.cloud import bigquery_storage_v1 import pyarrow as pa -from bigframes.core import bigframe_node, rewrite +from bigframes.core import bigframe_node, nodes, pyarrow_utils, rewrite from bigframes.session import executor, semi_executor @@ -39,14 +39,11 @@ def execute( ordered: bool, peek: Optional[int] = None, ) -> Optional[executor.ExecuteResult]: - node = rewrite.try_reduce_to_table_scan(plan) + node = self._try_adapt_plan(plan, ordered) if not node: return None if node.explicitly_ordered and ordered: return None - if peek: - # TODO: Support peeking - return None import google.cloud.bigquery_storage_v1.types as bq_storage_types from google.protobuf import timestamp_pb2 @@ -92,16 +89,39 @@ def execute( def process_page(page): pa_batch = page.to_arrow() + pa_batch = pa_batch.select( + [item.source_id for item in node.scan_list.items] + ) return pa.RecordBatch.from_arrays( pa_batch.columns, names=[id.sql for id in node.ids] ) batches = map(process_page, rowstream.pages) + if peek: + batches = pyarrow_utils.truncate_pyarrow_iterable(batches, max_results=peek) + + rows = node.source.n_rows + if peek and rows: + rows = min(peek, rows) + return executor.ExecuteResult( arrow_batches=batches, schema=plan.schema, query_job=None, total_bytes=None, - total_rows=node.source.n_rows, + total_rows=rows, ) + + def _try_adapt_plan( + self, + plan: bigframe_node.BigFrameNode, + ordered: bool, + ) -> Optional[nodes.ReadTableNode]: + """ + Tries to simplify the plan to an equivalent single ReadTableNode. Otherwise, returns None. + """ + if not ordered: + # gets rid of order_by ops + plan = rewrite.bake_order(plan) + return rewrite.try_reduce_to_table_scan(plan) diff --git a/tests/system/large/test_dataframe_io.py b/tests/system/large/test_dataframe_io.py index b10e361129..87d2acd34b 100644 --- a/tests/system/large/test_dataframe_io.py +++ b/tests/system/large/test_dataframe_io.py @@ -29,6 +29,20 @@ def test_to_pandas_batches_raise_when_large_result_not_allowed(session): next(df.to_pandas_batches(page_size=500, max_results=1500)) +def test_large_df_peek_no_job(session): + execution_count_before = session._metrics.execution_count + + # only works with null index, as sequential index requires row_number over full table scan. + df = session.read_gbq( + WIKIPEDIA_TABLE, index_col=bigframes.enums.DefaultIndexKind.NULL + ) + result = df.peek(50) + execution_count_after = session._metrics.execution_count + + assert len(result) == 50 + assert execution_count_after == execution_count_before + + def test_to_pandas_batches_override_global_option( session, ):