Skip to content

Commit 2bc4fbc

Browse files
perf: Optimize repr for unordered gbq table (#1778)
1 parent d6b7ab4 commit 2bc4fbc

File tree

6 files changed

+88
-29
lines changed

6 files changed

+88
-29
lines changed

bigframes/core/rewrite/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
try_reduce_to_local_scan,
2323
try_reduce_to_table_scan,
2424
)
25-
from bigframes.core.rewrite.slices import pull_up_limits, rewrite_slice
25+
from bigframes.core.rewrite.slices import pull_out_limit, pull_up_limits, rewrite_slice
2626
from bigframes.core.rewrite.timedeltas import rewrite_timedelta_expressions
2727
from bigframes.core.rewrite.windows import rewrite_range_rolling
2828

@@ -32,6 +32,7 @@
3232
"rewrite_slice",
3333
"rewrite_timedelta_expressions",
3434
"pull_up_limits",
35+
"pull_out_limit",
3536
"remap_variables",
3637
"defer_order",
3738
"column_pruning",

bigframes/core/rewrite/slices.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727

2828
def pull_up_limits(root: nodes.ResultNode) -> nodes.ResultNode:
29-
new_child, pulled_limit = _pullup_slice_inner(root.child)
29+
new_child, pulled_limit = pull_out_limit(root.child)
3030
if new_child == root.child:
3131
return root
3232
elif pulled_limit is None:
@@ -37,7 +37,7 @@ def pull_up_limits(root: nodes.ResultNode) -> nodes.ResultNode:
3737
return dataclasses.replace(root, child=new_child, limit=new_limit)
3838

3939

40-
def _pullup_slice_inner(
40+
def pull_out_limit(
4141
root: nodes.BigFrameNode,
4242
) -> Tuple[nodes.BigFrameNode, Optional[int]]:
4343
"""
@@ -53,15 +53,15 @@ def _pullup_slice_inner(
5353
assert root.step == 1
5454
assert root.stop is not None
5555
limit = root.stop
56-
new_root, prior_limit = _pullup_slice_inner(root.child)
56+
new_root, prior_limit = pull_out_limit(root.child)
5757
if (prior_limit is not None) and (prior_limit < limit):
5858
limit = prior_limit
5959
return new_root, limit
6060
elif (
6161
isinstance(root, (nodes.SelectionNode, nodes.ProjectionNode))
6262
and root.row_preserving
6363
):
64-
new_child, prior_limit = _pullup_slice_inner(root.child)
64+
new_child, prior_limit = pull_out_limit(root.child)
6565
if prior_limit is not None:
6666
return root.transform_children(lambda _: new_child), prior_limit
6767
# Most ops don't support pulling up slice, like filter, agg, join, etc.

bigframes/core/tree_properties.py

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,26 +45,13 @@ def can_fast_head(node: nodes.BigFrameNode) -> bool:
4545
# To do fast head operation:
4646
# (1) the underlying data must be arranged/indexed according to the logical ordering
4747
# (2) transformations must support pushing down LIMIT or a filter on row numbers
48-
return has_fast_offset_address(node) or has_fast_offset_address(node)
49-
50-
51-
def has_fast_orderby_limit(node: nodes.BigFrameNode) -> bool:
52-
"""True iff ORDER BY LIMIT can be performed without a large full table scan."""
53-
# TODO: In theory compatible with some Slice nodes, potentially by adding OFFSET
54-
if isinstance(node, nodes.LeafNode):
55-
return node.fast_ordered_limit
56-
if isinstance(node, (nodes.ProjectionNode, nodes.SelectionNode)):
57-
return has_fast_orderby_limit(node.child)
58-
return False
59-
60-
61-
def has_fast_offset_address(node: nodes.BigFrameNode) -> bool:
62-
"""True iff specific offsets can be scanned without a large full table scan."""
63-
# TODO: In theory can push offset lookups through slice operators by translating indices
64-
if isinstance(node, nodes.LeafNode):
65-
return node.fast_offsets
48+
if isinstance(node, nodes.ReadLocalNode):
49+
# always cheap to push slice into local data
50+
return True
51+
if isinstance(node, nodes.ReadTableNode):
52+
return (node.source.ordering is None) or (node.fast_ordered_limit)
6653
if isinstance(node, (nodes.ProjectionNode, nodes.SelectionNode)):
67-
return has_fast_offset_address(node.child)
54+
return can_fast_head(node.child)
6855
return False
6956

7057

bigframes/session/loader.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,7 @@ def read_gbq_table( # type: ignore[overload-overlap]
387387
enable_snapshot: bool = ...,
388388
dry_run: Literal[False] = ...,
389389
force_total_order: Optional[bool] = ...,
390+
n_rows: Optional[int] = None,
390391
) -> dataframe.DataFrame:
391392
...
392393

@@ -408,6 +409,7 @@ def read_gbq_table(
408409
enable_snapshot: bool = ...,
409410
dry_run: Literal[True] = ...,
410411
force_total_order: Optional[bool] = ...,
412+
n_rows: Optional[int] = None,
411413
) -> pandas.Series:
412414
...
413415

@@ -428,6 +430,7 @@ def read_gbq_table(
428430
enable_snapshot: bool = True,
429431
dry_run: bool = False,
430432
force_total_order: Optional[bool] = None,
433+
n_rows: Optional[int] = None,
431434
) -> dataframe.DataFrame | pandas.Series:
432435
import bigframes._tools.strings
433436
import bigframes.dataframe as dataframe
@@ -618,6 +621,7 @@ def read_gbq_table(
618621
at_time=time_travel_timestamp if enable_snapshot else None,
619622
primary_key=primary_key,
620623
session=self._session,
624+
n_rows=n_rows,
621625
)
622626
# if we don't have a unique index, we order by row hash if we are in strict mode
623627
if (
@@ -852,6 +856,7 @@ def read_gbq_query(
852856
columns=columns,
853857
use_cache=configuration["query"]["useQueryCache"],
854858
force_total_order=force_total_order,
859+
n_rows=query_job.result().total_rows,
855860
# max_results and filters are omitted because they are already
856861
# handled by to_query(), above.
857862
)

bigframes/session/read_api_execution.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,17 @@ def execute(
3939
ordered: bool,
4040
peek: Optional[int] = None,
4141
) -> Optional[executor.ExecuteResult]:
42-
node = self._try_adapt_plan(plan, ordered)
43-
if not node:
42+
adapt_result = self._try_adapt_plan(plan, ordered)
43+
if not adapt_result:
4444
return None
45+
node, limit = adapt_result
4546
if node.explicitly_ordered and ordered:
4647
return None
4748

49+
if limit is not None:
50+
if peek is None or limit < peek:
51+
peek = limit
52+
4853
import google.cloud.bigquery_storage_v1.types as bq_storage_types
4954
from google.protobuf import timestamp_pb2
5055

@@ -117,11 +122,20 @@ def _try_adapt_plan(
117122
self,
118123
plan: bigframe_node.BigFrameNode,
119124
ordered: bool,
120-
) -> Optional[nodes.ReadTableNode]:
125+
) -> Optional[tuple[nodes.ReadTableNode, Optional[int]]]:
121126
"""
122-
Tries to simplify the plan to an equivalent single ReadTableNode. Otherwise, returns None.
127+
Tries to simplify the plan to an equivalent single ReadTableNode and a limit. Otherwise, returns None.
123128
"""
129+
plan, limit = rewrite.pull_out_limit(plan)
130+
# bake_order does not allow slice ops
131+
plan = plan.bottom_up(rewrite.rewrite_slice)
124132
if not ordered:
125133
# gets rid of order_by ops
126134
plan = rewrite.bake_order(plan)
127-
return rewrite.try_reduce_to_table_scan(plan)
135+
read_table_node = rewrite.try_reduce_to_table_scan(plan)
136+
if read_table_node is None:
137+
return None
138+
if (limit is not None) and (read_table_node.source.ordering is not None):
139+
# read api can only use physical ordering to limit, not a logical ordering
140+
return None
141+
return (read_table_node, limit)

tests/system/small/session/test_read_gbq_colab.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020

2121
def test_read_gbq_colab_to_pandas_batches_preserves_order_by(maybe_ordered_session):
22+
executions_before_sql = maybe_ordered_session._metrics.execution_count
2223
df = maybe_ordered_session._read_gbq_colab(
2324
"""
2425
SELECT
@@ -32,16 +33,67 @@ def test_read_gbq_colab_to_pandas_batches_preserves_order_by(maybe_ordered_sessi
3233
LIMIT 300
3334
"""
3435
)
36+
executions_before_python = maybe_ordered_session._metrics.execution_count
3537
batches = df.to_pandas_batches(
3638
page_size=100,
3739
)
40+
executions_after = maybe_ordered_session._metrics.execution_count
3841

3942
total_rows = 0
4043
for batch in batches:
4144
assert batch["total"].is_monotonic_decreasing
4245
total_rows += len(batch.index)
4346

4447
assert total_rows > 0
48+
assert executions_after == executions_before_python == executions_before_sql + 1
49+
50+
51+
def test_read_gbq_colab_peek_avoids_requery(maybe_ordered_session):
52+
executions_before_sql = maybe_ordered_session._metrics.execution_count
53+
df = maybe_ordered_session._read_gbq_colab(
54+
"""
55+
SELECT
56+
name,
57+
SUM(number) AS total
58+
FROM
59+
`bigquery-public-data.usa_names.usa_1910_2013`
60+
WHERE state LIKE 'W%'
61+
GROUP BY name
62+
ORDER BY total DESC
63+
LIMIT 300
64+
"""
65+
)
66+
executions_before_python = maybe_ordered_session._metrics.execution_count
67+
result = df.peek(100)
68+
executions_after = maybe_ordered_session._metrics.execution_count
69+
70+
# Ok, this isn't guaranteed by peek, but should happen with read api based impl
71+
# if starts failing, maybe stopped using read api?
72+
assert result["total"].is_monotonic_decreasing
73+
74+
assert len(result) == 100
75+
assert executions_after == executions_before_python == executions_before_sql + 1
76+
77+
78+
def test_read_gbq_colab_repr_avoids_requery(maybe_ordered_session):
79+
executions_before_sql = maybe_ordered_session._metrics.execution_count
80+
df = maybe_ordered_session._read_gbq_colab(
81+
"""
82+
SELECT
83+
name,
84+
SUM(number) AS total
85+
FROM
86+
`bigquery-public-data.usa_names.usa_1910_2013`
87+
WHERE state LIKE 'W%'
88+
GROUP BY name
89+
ORDER BY total DESC
90+
LIMIT 300
91+
"""
92+
)
93+
executions_before_python = maybe_ordered_session._metrics.execution_count
94+
_ = repr(df)
95+
executions_after = maybe_ordered_session._metrics.execution_count
96+
assert executions_after == executions_before_python == executions_before_sql + 1
4597

4698

4799
def test_read_gbq_colab_includes_formatted_scalars(session):

0 commit comments

Comments
 (0)