Skip to content

Commit 108f4d2

Browse files
feat: Use read api for some peek ops (#1731)
1 parent 476b7dd commit 108f4d2

File tree

2 files changed

+40
-6
lines changed

2 files changed

+40
-6
lines changed

bigframes/session/read_api_execution.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from google.cloud import bigquery_storage_v1
1919
import pyarrow as pa
2020

21-
from bigframes.core import bigframe_node, rewrite
21+
from bigframes.core import bigframe_node, nodes, pyarrow_utils, rewrite
2222
from bigframes.session import executor, semi_executor
2323

2424

@@ -39,14 +39,11 @@ def execute(
3939
ordered: bool,
4040
peek: Optional[int] = None,
4141
) -> Optional[executor.ExecuteResult]:
42-
node = rewrite.try_reduce_to_table_scan(plan)
42+
node = self._try_adapt_plan(plan, ordered)
4343
if not node:
4444
return None
4545
if node.explicitly_ordered and ordered:
4646
return None
47-
if peek:
48-
# TODO: Support peeking
49-
return None
5047

5148
import google.cloud.bigquery_storage_v1.types as bq_storage_types
5249
from google.protobuf import timestamp_pb2
@@ -92,16 +89,39 @@ def execute(
9289

9390
def process_page(page):
9491
pa_batch = page.to_arrow()
92+
pa_batch = pa_batch.select(
93+
[item.source_id for item in node.scan_list.items]
94+
)
9595
return pa.RecordBatch.from_arrays(
9696
pa_batch.columns, names=[id.sql for id in node.ids]
9797
)
9898

9999
batches = map(process_page, rowstream.pages)
100100

101+
if peek:
102+
batches = pyarrow_utils.truncate_pyarrow_iterable(batches, max_results=peek)
103+
104+
rows = node.source.n_rows
105+
if peek and rows:
106+
rows = min(peek, rows)
107+
101108
return executor.ExecuteResult(
102109
arrow_batches=batches,
103110
schema=plan.schema,
104111
query_job=None,
105112
total_bytes=None,
106-
total_rows=node.source.n_rows,
113+
total_rows=rows,
107114
)
115+
116+
def _try_adapt_plan(
117+
self,
118+
plan: bigframe_node.BigFrameNode,
119+
ordered: bool,
120+
) -> Optional[nodes.ReadTableNode]:
121+
"""
122+
Tries to simplify the plan to an equivalent single ReadTableNode. Otherwise, returns None.
123+
"""
124+
if not ordered:
125+
# gets rid of order_by ops
126+
plan = rewrite.bake_order(plan)
127+
return rewrite.try_reduce_to_table_scan(plan)

tests/system/large/test_dataframe_io.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,20 @@ def test_to_pandas_batches_raise_when_large_result_not_allowed(session):
2929
next(df.to_pandas_batches(page_size=500, max_results=1500))
3030

3131

32+
def test_large_df_peek_no_job(session):
33+
execution_count_before = session._metrics.execution_count
34+
35+
# only works with null index, as sequential index requires row_number over full table scan.
36+
df = session.read_gbq(
37+
WIKIPEDIA_TABLE, index_col=bigframes.enums.DefaultIndexKind.NULL
38+
)
39+
result = df.peek(50)
40+
execution_count_after = session._metrics.execution_count
41+
42+
assert len(result) == 50
43+
assert execution_count_after == execution_count_before
44+
45+
3246
def test_to_pandas_batches_override_global_option(
3347
session,
3448
):

0 commit comments

Comments
 (0)