diff --git a/bigframes/streaming/dataframe.py b/bigframes/streaming/dataframe.py index 90c638b82e..2180a66207 100644 --- a/bigframes/streaming/dataframe.py +++ b/bigframes/streaming/dataframe.py @@ -24,7 +24,7 @@ from google.cloud import bigquery from bigframes import dataframe -from bigframes.core import log_adapter +from bigframes.core import log_adapter, nodes import bigframes.exceptions as bfe import bigframes.session @@ -54,7 +54,7 @@ def _curate_df_doc(doc: Optional[str]): class StreamingBase: - sql: str + _appends_sql: str _session: bigframes.session.Session def to_bigtable( @@ -124,7 +124,7 @@ def to_bigtable( can be examined. """ return _to_bigtable( - self.sql, + self._appends_sql, instance=instance, table=table, service_account_email=service_account_email, @@ -181,7 +181,7 @@ def to_pubsub( can be examined. """ return _to_pubsub( - self.sql, + self._appends_sql, topic=topic, service_account_email=service_account_email, session=self._session, @@ -218,6 +218,19 @@ def __init__(self, df: dataframe.DataFrame, *, create_key=0): def _from_table_df(cls, df: dataframe.DataFrame) -> StreamingDataFrame: return cls(df, create_key=cls._create_key) + @property + def _original_table(self): + def traverse(node: nodes.BigFrameNode): + if isinstance(node, nodes.ReadTableNode): + return f"{node.source.table.project_id}.{node.source.table.dataset_id}.{node.source.table.table_id}" + for child in node.child_nodes: + original_table = traverse(child) + if original_table: + return original_table + return None + + return traverse(self._df._block._expr.node) + def __getitem__(self, *args, **kwargs): return _return_type_wrapper(self._df.__getitem__, StreamingDataFrame)( *args, **kwargs @@ -266,6 +279,17 @@ def sql(self): sql.__doc__ = _curate_df_doc(inspect.getdoc(dataframe.DataFrame.sql)) + # Patch for the required APPENDS clause + @property + def _appends_sql(self): + sql_str = self.sql + original_table = self._original_table + assert original_table is not None + + appends_clause = f"APPENDS(TABLE `{original_table}`, NULL, NULL)" + sql_str = sql_str.replace(f"`{original_table}`", appends_clause) + return sql_str + @property def _session(self): return self._df._session