Skip to content

Commit 7d93b35

Browse files
authored
chore: enable bigframes objects in _read_gbq_colab pyformat arg (#1727)
* chore: enable bigframes objects in _read_gbq_colab pyformat arg * use views for bigframes dataframe * move temp view logic to session * revert changes to block * revert more unnecessary view changes
1 parent f8d2cd2 commit 7d93b35

File tree

8 files changed

+137
-20
lines changed

8 files changed

+137
-20
lines changed

bigframes/core/blocks.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ def __init__(
153153

154154
self._stats_cache[" ".join(self.index_columns)] = {}
155155
self._transpose_cache: Optional[Block] = transpose_cache
156+
self._view_ref: Optional[bigquery.TableReference] = None
156157

157158
@classmethod
158159
def from_local(
@@ -2487,6 +2488,17 @@ def to_sql_query(
24872488
idx_labels,
24882489
)
24892490

2491+
def to_view(self, include_index: bool) -> bigquery.TableReference:
2492+
"""
2493+
Creates a temporary BigQuery VIEW with the SQL corresponding to this block.
2494+
"""
2495+
if self._view_ref is not None:
2496+
return self._view_ref
2497+
2498+
sql, _, _ = self.to_sql_query(include_index=include_index)
2499+
self._view_ref = self.session._create_temp_view(sql)
2500+
return self._view_ref
2501+
24902502
def cached(self, *, force: bool = False, session_aware: bool = False) -> None:
24912503
"""Write the block to a session table."""
24922504
# use a heuristic for whether something needs to be cached

bigframes/core/pyformat.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,30 +37,41 @@ def _table_to_sql(table: _BQ_TABLE_TYPES) -> str:
3737
return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`"
3838

3939

40-
def _field_to_template_value(name: str, value: Any) -> str:
40+
def _field_to_template_value(
41+
name: str,
42+
value: Any,
43+
) -> str:
4144
"""Convert value to something embeddable in a SQL string."""
4245
import bigframes.core.sql # Avoid circular imports
46+
import bigframes.dataframe # Avoid circular imports
4347

4448
_validate_type(name, value)
4549

4650
table_types = typing.get_args(_BQ_TABLE_TYPES)
4751
if isinstance(value, table_types):
4852
return _table_to_sql(value)
4953

50-
# TODO(tswast): convert DataFrame objects to gbq tables or a literals subquery.
54+
# TODO(tswast): convert pandas DataFrame objects to gbq tables or a literals subquery.
55+
if isinstance(value, bigframes.dataframe.DataFrame):
56+
return _table_to_sql(value._to_view())
57+
5158
return bigframes.core.sql.simple_literal(value)
5259

5360

5461
def _validate_type(name: str, value: Any):
5562
"""Raises TypeError if value is unsupported."""
5663
import bigframes.core.sql # Avoid circular imports
64+
import bigframes.dataframe # Avoid circular imports
5765

5866
if value is None:
5967
return # None can't be used in isinstance, but is a valid literal.
6068

61-
supported_types = typing.get_args(_BQ_TABLE_TYPES) + typing.get_args(
62-
bigframes.core.sql.SIMPLE_LITERAL_TYPES
69+
supported_types = (
70+
typing.get_args(_BQ_TABLE_TYPES)
71+
+ typing.get_args(bigframes.core.sql.SIMPLE_LITERAL_TYPES)
72+
+ (bigframes.dataframe.DataFrame,)
6373
)
74+
6475
if not isinstance(value, supported_types):
6576
raise TypeError(
6677
f"{name} has unsupported type: {type(value)}. "
@@ -80,8 +91,6 @@ def pyformat(
8091
sql_template: str,
8192
*,
8293
pyformat_args: dict,
83-
# TODO: add dry_run parameter to avoid expensive API calls in conversion
84-
# TODO: and session to upload data / convert to table if necessary
8594
) -> str:
8695
"""Unsafe Python-style string formatting of SQL string.
8796

bigframes/dataframe.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,19 @@ def astype(
394394

395395
return self._apply_unary_op(ops.AsTypeOp(dtype, safe_cast))
396396

397+
def _should_sql_have_index(self) -> bool:
398+
"""Should the SQL we pass to BQML and other I/O include the index?"""
399+
400+
return self._has_index and (
401+
self.index.name is not None or len(self.index.names) > 1
402+
)
403+
404+
def _to_view(self) -> bigquery.TableReference:
405+
"""Compiles this DataFrame's expression tree to SQL and saves it to a
406+
(temporary) view.
407+
"""
408+
return self._block.to_view(include_index=self._should_sql_have_index())
409+
397410
def _to_sql_query(
398411
self, include_index: bool, enable_cache: bool = True
399412
) -> Tuple[str, list[str], list[blocks.Label]]:
@@ -420,9 +433,7 @@ def sql(self) -> str:
420433
string representing the compiled SQL.
421434
"""
422435
try:
423-
include_index = self._has_index and (
424-
self.index.name is not None or len(self.index.names) > 1
425-
)
436+
include_index = self._should_sql_have_index()
426437
sql, _, _ = self._to_sql_query(include_index=include_index)
427438
return sql
428439
except AttributeError as e:

bigframes/session/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,6 @@ def _read_gbq_colab(
527527
query = bigframes.core.pyformat.pyformat(
528528
query,
529529
pyformat_args=pyformat_args,
530-
# TODO: add dry_run parameter to avoid API calls for data in pyformat_args
531530
)
532531

533532
return self._loader.read_gbq_query(
@@ -1938,6 +1937,10 @@ def _create_object_table(self, path: str, connection: str) -> str:
19381937

19391938
return table
19401939

1940+
def _create_temp_view(self, sql: str) -> bigquery.TableReference:
1941+
"""Create a random id Object Table from the input path and connection."""
1942+
return self._anon_dataset_manager.create_temp_view(sql)
1943+
19411944
def from_glob_path(
19421945
self, path: str, *, connection: Optional[str] = None, name: Optional[str] = None
19431946
) -> dataframe.DataFrame:

bigframes/session/_io/bigquery/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,28 @@ def create_temp_table(
139139
return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"
140140

141141

142+
def create_temp_view(
143+
bqclient: bigquery.Client,
144+
table_ref: bigquery.TableReference,
145+
*,
146+
expiration: datetime.datetime,
147+
sql: str,
148+
) -> str:
149+
"""Create an empty table with an expiration in the desired session.
150+
151+
The table will be deleted when the session is closed or the expiration
152+
is reached.
153+
"""
154+
destination = bigquery.Table(table_ref)
155+
destination.expires = expiration
156+
destination.view_query = sql
157+
158+
# Ok if already exists, since this will only happen from retries internal to this method
159+
# as the requested table id has a random UUID4 component.
160+
bqclient.create_table(destination, exists_ok=True)
161+
return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"
162+
163+
142164
def set_table_expiration(
143165
bqclient: bigquery.Client,
144166
table_ref: bigquery.TableReference,

bigframes/session/anonymous_dataset.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,20 @@ def __init__(
5353
def location(self):
5454
return self._location
5555

56+
def _default_expiration(self):
57+
"""When should the table expire automatically?"""
58+
return (
59+
datetime.datetime.now(datetime.timezone.utc) + constants.DEFAULT_EXPIRATION
60+
)
61+
5662
def create_temp_table(
5763
self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str] = []
5864
) -> bigquery.TableReference:
5965
"""
6066
Allocates and and creates a table in the anonymous dataset.
6167
The table will be cleaned up by clean_up_tables.
6268
"""
63-
expiration = (
64-
datetime.datetime.now(datetime.timezone.utc) + constants.DEFAULT_EXPIRATION
65-
)
69+
expiration = self._default_expiration()
6670
table = bf_io_bigquery.create_temp_table(
6771
self.bqclient,
6872
self.allocate_temp_table(),
@@ -73,6 +77,20 @@ def create_temp_table(
7377
)
7478
return bigquery.TableReference.from_string(table)
7579

80+
def create_temp_view(self, sql: str) -> bigquery.TableReference:
81+
"""
82+
Allocates and and creates a view in the anonymous dataset.
83+
The view will be cleaned up by clean_up_tables.
84+
"""
85+
expiration = self._default_expiration()
86+
table = bf_io_bigquery.create_temp_view(
87+
self.bqclient,
88+
self.allocate_temp_table(),
89+
expiration=expiration,
90+
sql=sql,
91+
)
92+
return bigquery.TableReference.from_string(table)
93+
7694
def allocate_temp_table(self) -> bigquery.TableReference:
7795
"""
7896
Allocates a unique table id, but does not create the table.

tests/system/small/session/test_read_gbq_colab.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,31 @@ def test_read_gbq_colab_includes_formatted_scalars(session):
7373
}
7474
),
7575
)
76+
77+
78+
def test_read_gbq_colab_includes_formatted_bigframes_dataframe(
79+
session, scalars_df_index, scalars_pandas_df_index
80+
):
81+
pyformat_args = {
82+
# Apply some operations to make sure the columns aren't renamed.
83+
"some_dataframe": scalars_df_index[scalars_df_index["int64_col"] > 0].assign(
84+
int64_col=scalars_df_index["int64_too"]
85+
),
86+
# This is not a supported type, but ignored if not referenced.
87+
"some_object": object(),
88+
}
89+
df = session._read_gbq_colab(
90+
"""
91+
SELECT int64_col, rowindex
92+
FROM {some_dataframe}
93+
ORDER BY rowindex ASC
94+
""",
95+
pyformat_args=pyformat_args,
96+
)
97+
result = df.to_pandas()
98+
expected = (
99+
scalars_pandas_df_index[scalars_pandas_df_index["int64_col"] > 0]
100+
.assign(int64_col=scalars_pandas_df_index["int64_too"])
101+
.reset_index(drop=False)[["int64_col", "rowindex"]]
102+
)
103+
pandas.testing.assert_frame_equal(result, expected)

tests/unit/session/test_read_gbq_colab.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414

1515
"""Unit tests for read_gbq_colab helper functions."""
1616

17+
import textwrap
18+
19+
from google.cloud import bigquery
20+
1721
from bigframes.testing import mocks
1822

1923

@@ -32,29 +36,39 @@ def test_read_gbq_colab_includes_label():
3236
assert "session-read_gbq_colab" in label_values
3337

3438

35-
def test_read_gbq_colab_includes_formatted_values_in_dry_run():
39+
def test_read_gbq_colab_includes_formatted_values_in_dry_run(monkeypatch):
3640
session = mocks.create_bigquery_session()
41+
bf_df = mocks.create_dataframe(monkeypatch, session=session)
42+
bf_df._to_view = lambda: bigquery.TableReference.from_string("my-project.my_dataset.some_view") # type: ignore
3743

3844
pyformat_args = {
3945
"some_integer": 123,
4046
"some_string": "This could be dangerous, but we escape it",
47+
"bf_df": bf_df,
4148
# This is not a supported type, but ignored if not referenced.
4249
"some_object": object(),
4350
}
51+
4452
_ = session._read_gbq_colab(
45-
"""
46-
SELECT {some_integer} as some_integer,
47-
{some_string} as some_string,
48-
'{{escaped}}' as escaped
49-
""",
53+
textwrap.dedent(
54+
"""
55+
SELECT {some_integer} as some_integer,
56+
{some_string} as some_string,
57+
'{{escaped}}' as escaped
58+
FROM {bf_df}
59+
"""
60+
),
5061
pyformat_args=pyformat_args,
5162
dry_run=True,
5263
)
53-
expected = """
64+
expected = textwrap.dedent(
65+
"""
5466
SELECT 123 as some_integer,
5567
'This could be dangerous, but we escape it' as some_string,
5668
'{escaped}' as escaped
69+
FROM `my-project`.`my_dataset`.`some_view`
5770
"""
71+
)
5872
queries = session._queries # type: ignore
5973
configs = session._job_configs # type: ignore
6074

0 commit comments

Comments
 (0)