Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions bigframes/streaming/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from google.cloud import bigquery

from bigframes import dataframe
from bigframes.core import log_adapter
from bigframes.core import log_adapter, nodes
import bigframes.exceptions as bfe
import bigframes.session

Expand Down Expand Up @@ -54,7 +54,7 @@ def _curate_df_doc(doc: Optional[str]):


class StreamingBase:
sql: str
_appends_sql: str
_session: bigframes.session.Session

def to_bigtable(
Expand Down Expand Up @@ -124,7 +124,7 @@ def to_bigtable(
can be examined.
"""
return _to_bigtable(
self.sql,
self._appends_sql,
instance=instance,
table=table,
service_account_email=service_account_email,
Expand Down Expand Up @@ -181,7 +181,7 @@ def to_pubsub(
can be examined.
"""
return _to_pubsub(
self.sql,
self._appends_sql,
topic=topic,
service_account_email=service_account_email,
session=self._session,
Expand Down Expand Up @@ -218,6 +218,19 @@ def __init__(self, df: dataframe.DataFrame, *, create_key=0):
def _from_table_df(cls, df: dataframe.DataFrame) -> StreamingDataFrame:
return cls(df, create_key=cls._create_key)

@property
def _original_table(self):
def traverse(node: nodes.BigFrameNode):
if isinstance(node, nodes.ReadTableNode):
return f"{node.source.table.project_id}.{node.source.table.dataset_id}.{node.source.table.table_id}"
for child in node.child_nodes:
original_table = traverse(child)
if original_table:
return original_table
return None

return traverse(self._df._block._expr.node)

def __getitem__(self, *args, **kwargs):
return _return_type_wrapper(self._df.__getitem__, StreamingDataFrame)(
*args, **kwargs
Expand Down Expand Up @@ -266,6 +279,17 @@ def sql(self):

sql.__doc__ = _curate_df_doc(inspect.getdoc(dataframe.DataFrame.sql))

# Patch for the required APPENDS clause
@property
def _appends_sql(self):
sql_str = self.sql
original_table = self._original_table
assert original_table is not None

appends_clause = f"APPENDS(TABLE `{original_table}`, NULL, NULL)"
sql_str = sql_str.replace(f"`{original_table}`", appends_clause)
return sql_str

@property
def _session(self):
return self._df._session
Expand Down