Skip to content

Commit 056c6c8

Browse files
committed
fix: avoid table modification for to_gbq
1 parent 476b7dd commit 056c6c8

File tree

3 files changed

+85
-11
lines changed

3 files changed

+85
-11
lines changed

bigframes/core/compile/sqlglot/sqlglot_ir.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import dataclasses
1818
import typing
1919

20+
from google.cloud import bigquery
2021
import pyarrow as pa
2122
import sqlglot as sg
2223
import sqlglot.dialects.bigquery
@@ -104,6 +105,24 @@ def from_pyarrow(
104105
)
105106
return cls(expr=sg.select(sge.Star()).from_(expr), uid_gen=uid_gen)
106107

108+
@classmethod
109+
def from_query_string(
110+
cls,
111+
query_string: str,
112+
) -> SQLGlotIR:
113+
"""Builds SQLGlot expression from a query string"""
114+
uid_gen: guid.SequentialUIDGenerator = guid.SequentialUIDGenerator()
115+
cte_name = sge.to_identifier(
116+
next(uid_gen.get_uid_stream("bfcte_")), quoted=cls.quoted
117+
)
118+
cte = sge.CTE(
119+
this=query_string,
120+
alias=cte_name,
121+
)
122+
select_expr = sge.Select().select(sge.Star()).from_(sge.Table(this=cte_name))
123+
select_expr.set("with", sge.With(expressions=[cte]))
124+
return cls(expr=select_expr, uid_gen=uid_gen)
125+
107126
def select(
108127
self,
109128
selected_cols: tuple[tuple[str, sge.Expression], ...],
@@ -133,6 +152,36 @@ def project(
133152
select_expr = self.expr.select(*projected_cols_expr, append=True)
134153
return SQLGlotIR(expr=select_expr)
135154

155+
def insert(
156+
self,
157+
destination: bigquery.TableReference,
158+
) -> str:
159+
return sge.insert(self.expr.subquery(), _table(destination)).sql(
160+
dialect=self.dialect, pretty=self.pretty
161+
)
162+
163+
def replace(
164+
self,
165+
destination: bigquery.TableReference,
166+
) -> str:
167+
# Workaround for SQLGlot breaking change:
168+
# https://github.com/tobymao/sqlglot/pull/4495
169+
whens_expr = [
170+
sge.When(matched=False, source=True, then=sge.Delete()),
171+
sge.When(matched=False, then=sge.Insert(this=sge.Var(this="ROW"))),
172+
]
173+
whens_str = "\n".join(
174+
when_expr.sql(dialect=self.dialect, pretty=self.pretty)
175+
for when_expr in whens_expr
176+
)
177+
178+
merge_str = sge.Merge(
179+
this=_table(destination),
180+
using=self.expr.subquery(),
181+
on=_literal(False, dtypes.BOOL_DTYPE),
182+
).sql(dialect=self.dialect, pretty=self.pretty)
183+
return f"{merge_str}\n{whens_str}"
184+
136185
def _encapsulate_as_cte(
137186
self,
138187
) -> sge.Select:
@@ -190,3 +239,11 @@ def _literal(value: typing.Any, dtype: dtypes.Dtype) -> sge.Expression:
190239

191240
def _cast(arg: typing.Any, to: str) -> sge.Cast:
192241
return sge.Cast(this=arg, to=to)
242+
243+
244+
def _table(table: bigquery.TableReference) -> sge.Table:
245+
return sge.Table(
246+
this=sg.to_identifier(table.table_id, quoted=True),
247+
db=sg.to_identifier(table.dataset_id, quoted=True),
248+
catalog=sg.to_identifier(table.project, quoted=True),
249+
)

bigframes/session/bq_caching_executor.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import bigframes.core
3131
from bigframes.core import compile, rewrite
32+
import bigframes.core.compile.sqlglot.sqlglot_ir as sqlglot_ir
3233
import bigframes.core.guid
3334
import bigframes.core.nodes as nodes
3435
import bigframes.core.ordering as order
@@ -206,17 +207,31 @@ def export_gbq(
206207
if bigframes.options.compute.enable_multi_query_execution:
207208
self._simplify_with_caching(array_value)
208209

209-
dispositions = {
210-
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
211-
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
212-
"append": bigquery.WriteDisposition.WRITE_APPEND,
213-
}
210+
table_exists = True
211+
try:
212+
table = self.bqclient.get_table(destination)
213+
if if_exists == "fail":
214+
raise ValueError(f"Table already exists: {destination.__str__()}")
215+
except google.api_core.exceptions.NotFound:
216+
table_exists = False
217+
218+
# b/409086472: Uses DML for table appends and replacements to avoid
219+
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
220+
# https://cloud.google.com/bigquery/quotas#standard_tables
214221
sql = self.to_sql(array_value, ordered=False)
215-
job_config = bigquery.QueryJobConfig(
216-
write_disposition=dispositions[if_exists],
217-
destination=destination,
218-
clustering_fields=cluster_cols if cluster_cols else None,
219-
)
222+
if not table_exists:
223+
job_config = bigquery.QueryJobConfig(
224+
destination=destination,
225+
clustering_fields=cluster_cols if cluster_cols else None,
226+
)
227+
else:
228+
job_config = bigquery.QueryJobConfig()
229+
ir = sqlglot_ir.SQLGlotIR.from_query_string(sql)
230+
if if_exists == "append":
231+
sql = ir.insert(destination)
232+
else: # for "replace"
233+
sql = ir.replace(destination)
234+
220235
# TODO(swast): plumb through the api_name of the user-facing api that
221236
# caused this query.
222237
_, query_job = self._run_execute_query(

tests/system/small/test_dataframe_io.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,9 @@ def test_to_gbq_if_exists(
515515
expected_index,
516516
):
517517
"""Test the `to_gbq` API with the `if_exists` parameter."""
518-
destination_table = f"{dataset_id}.test_to_gbq_if_exists_{if_exists}"
518+
destination_table = (
519+
f"{dataset_id}.test_to_gbq_if_exists_{if_exists}_{expected_index}"
520+
)
519521

520522
scalars_df_default_index.to_gbq(destination_table)
521523
scalars_df_default_index.to_gbq(destination_table, if_exists=if_exists)

0 commit comments

Comments
 (0)