From a9edb2aad584f5c360231bd63ec448a476a2a459 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Sat, 26 Apr 2025 22:08:38 -0500 Subject: [PATCH 01/18] perf: defer query in `read_gbq` with wildcard tables --- bigframes/core/nodes.py | 6 ++- bigframes/core/schema.py | 12 ++++-- bigframes/core/tools/bigquery.py | 39 +++++++++++++++++++ bigframes/session/_io/bigquery/__init__.py | 3 ++ .../session/_io/bigquery/read_gbq_table.py | 9 +++++ bigframes/session/bq_caching_executor.py | 1 + bigframes/session/loader.py | 6 +-- tests/system/small/test_dataframe.py | 14 +++++++ 8 files changed, 81 insertions(+), 9 deletions(-) create mode 100644 bigframes/core/tools/bigquery.py diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index f7327f2a7a..77f6565c91 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -708,10 +708,12 @@ class GbqTable: @staticmethod def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: # Subsetting fields with columns can reduce cost of row-hash default ordering + table_schema = bigframes.core.tools.bigquery.get_schema_and_pseudocolumns(table) + if columns: - schema = tuple(item for item in table.schema if item.name in columns) + schema = tuple(item for item in table_schema if item.name in columns) else: - schema = tuple(table.schema) + schema = tuple(table_schema) return GbqTable( project_id=table.project, dataset_id=table.dataset_id, diff --git a/bigframes/core/schema.py b/bigframes/core/schema.py index 4f636ab210..6243ed86c8 100644 --- a/bigframes/core/schema.py +++ b/bigframes/core/schema.py @@ -48,15 +48,19 @@ def from_bq_table( typing.Dict[str, bigframes.dtypes.Dtype] ] = None, ): + # Avoid circular imports. + import bigframes.core.tools.bigquery + if column_type_overrides is None: column_type_overrides = {} - items = tuple( + items = [ SchemaItem(name, column_type_overrides.get(name, dtype)) for name, dtype in bigframes.dtypes.bf_type_from_type_kind( - table.schema + bigframes.core.tools.bigquery.get_schema_and_pseudocolumns(table) ).items() - ) - return ArraySchema(items) + ] + + return ArraySchema(tuple(items)) @property def names(self) -> typing.Tuple[str, ...]: diff --git a/bigframes/core/tools/bigquery.py b/bigframes/core/tools/bigquery.py new file mode 100644 index 0000000000..13853a3ada --- /dev/null +++ b/bigframes/core/tools/bigquery.py @@ -0,0 +1,39 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Private helpers for loading a BigQuery table as a BigQuery DataFrames DataFrame. +""" + +from __future__ import annotations + +import google.cloud.bigquery as bigquery + + +def get_schema_and_pseudocolumns( + table: bigquery.table.Table, +) -> list[bigquery.SchemaField]: + fields = list(table.schema) + + # TODO(tswast): Add _PARTITIONTIME and/or _PARTIONDATE for injestion + # time partitioned tables. + if table.table_id.endswith("*"): + fields.append( + bigquery.SchemaField( + "_TABLE_SUFFIX", + "STRING", + ) + ) + + return fields diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 4fdd836777..08b05452a2 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -393,6 +393,9 @@ def to_query( else: select_clause = "SELECT *" + if query_or_table.endswith("*"): + select_clause += ", _TABLE_SUFFIX" + time_travel_clause = "" if time_travel_timestamp is not None: time_travel_literal = bigframes.core.sql.simple_literal(time_travel_timestamp) diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 34183b22bc..a07a2508f3 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -101,7 +101,16 @@ def validate_table( # Anonymous dataset, does not support snapshot ever if table.dataset_id.startswith("_"): pass + # Only true tables support time travel + elif table.table_id.endswith("*"): + msg = bfe.format_message( + "Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. " + "Attempting query without time travel. Be aware that " + "modifications to the underlying data may result in errors or " + "unexpected behavior." + ) + warnings.warn(msg, category=bfe.TimeTravelDisabledWarning) elif table.table_type != "TABLE": if table.table_type == "MATERIALIZED_VIEW": msg = bfe.format_message( diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 584b41452a..474b90835f 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -174,6 +174,7 @@ def export_gbq( # Only update schema if this is not modifying an existing table, and the # new table contains timedelta columns. table = self.bqclient.get_table(destination) + # TODO(tswast): What to do with pseudocolumns? table.schema = array_value.schema.to_bigquery() self.bqclient.update_table(table, ["schema"]) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 76f12ae438..03058ad850 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -442,9 +442,9 @@ def read_gbq_table( # when max_results is set. # TODO(b/338419730): We don't need to fallback to a query for wildcard # tables if we allow some non-determinism when time travel isn't supported. - if max_results is not None or bf_io_bigquery.is_table_with_wildcard_suffix( - query - ): + if ( + max_results is not None + ): # or bf_io_bigquery.is_table_with_wildcard_suffix(query): # TODO(b/338111344): If we are running a query anyway, we might as # well generate ROW_NUMBER() at the same time. all_columns: Iterable[str] = ( diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 452fa841e4..192118ec92 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -14,6 +14,7 @@ import io import operator +import re import sys import tempfile import typing @@ -5284,6 +5285,19 @@ def test_to_gbq_and_create_dataset(session, scalars_df_index, dataset_id_not_cre assert not loaded_scalars_df_index.empty +def test_read_gbq_to_pandas_wildcard(unordered_session: bigframes.Session): + with pytest.warns( + bigframes.exceptions.TimeTravelDisabledWarning, + match=re.escape("Wildcard tables do not support FOR SYSTEM_TIME"), + ): + df = unordered_session.read_gbq("bigquery-public-data.noaa_gsod.gsod*") + df = df[df["_TABLE_SUFFIX"] == "1929"][["da", "mo", "year", "max"]] + df.to_pandas() + rows, columns = df.shape + assert rows > 0 + assert columns == 4 + + def test_read_gbq_to_pandas_no_exec(unordered_session: bigframes.Session): metrics = unordered_session._metrics execs_pre = metrics.execution_count From df795b154461ca1b2f0927d84378023b38cfa0ac Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Sat, 26 Apr 2025 22:13:26 -0500 Subject: [PATCH 02/18] remove obsolete comments --- bigframes/session/loader.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 03058ad850..89375e063d 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -440,11 +440,7 @@ def read_gbq_table( # clustered tables, so fallback to a query. We do this here so that # the index is consistent with tables that have primary keys, even # when max_results is set. - # TODO(b/338419730): We don't need to fallback to a query for wildcard - # tables if we allow some non-determinism when time travel isn't supported. - if ( - max_results is not None - ): # or bf_io_bigquery.is_table_with_wildcard_suffix(query): + if max_results is not None: # TODO(b/338111344): If we are running a query anyway, we might as # well generate ROW_NUMBER() at the same time. all_columns: Iterable[str] = ( From 79f4c58210b6049437e632037e25cf1da5987b3d Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Mon, 28 Apr 2025 17:44:18 -0500 Subject: [PATCH 03/18] use sql node instead of ibis table node to keep select * from omitting pseudocolumns Fixes this code sample: import bigframes.pandas as bpd df = bpd.read_gbq("bigquery-public-data.google_analytics_sample.ga_sessions_*") df[df["_TABLE_SUFFIX"] == "20161204"].peek() --- bigframes/core/compile/compiler.py | 7 ++++++- tests/unit/session/test_io_bigquery.py | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index a778889925..2a4f6f1506 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -219,7 +219,12 @@ def _table_to_ibis( physical_schema = ibis_bigquery.BigQuerySchema.to_ibis( list(source.table.physical_schema) ) - if source.at_time is not None or source.sql_predicate is not None: + if ( + source.at_time is not None + or source.sql_predicate is not None + # TODO(tswast): make a more general way to check if the table node has pseudocolumns. + or source.table.table_id.endswith("*") + ): import bigframes.session._io.bigquery sql = bigframes.session._io.bigquery.to_query( diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index af2c7714ab..225362d522 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -406,7 +406,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], None, # max_results None, # time_travel_timestampe - "SELECT * FROM `test_table*`", + "SELECT *, _TABLE_SUFFIX FROM `test_table*`", id="wildcard-no_params", ), pytest.param( @@ -415,7 +415,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [("_TABLE_SUFFIX", ">", "2022-10-20")], None, # max_results None, # time_travel_timestampe - "SELECT * FROM `test_table*` WHERE `_TABLE_SUFFIX` > '2022-10-20'", + "SELECT *, _TABLE_SUFFIX FROM `test_table*` WHERE `_TABLE_SUFFIX` > '2022-10-20'", id="wildcard-filter", ), ], From 5b0d0a0a78eb19493cce8ac1185cc062259fb2f3 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Mon, 28 Apr 2025 21:21:07 -0500 Subject: [PATCH 04/18] test with cache and to_gbq --- bigframes/core/array_value.py | 8 ++++++-- bigframes/core/utils.py | 20 ++++++++++++++++++++ bigframes/session/loader.py | 4 ++-- tests/system/small/test_dataframe.py | 17 +++++++++++++++++ tests/system/small/test_dataframe_io.py | 20 ++++++++++++++++++++ 5 files changed, 65 insertions(+), 4 deletions(-) diff --git a/bigframes/core/array_value.py b/bigframes/core/array_value.py index eba63ad72e..fb57858803 100644 --- a/bigframes/core/array_value.py +++ b/bigframes/core/array_value.py @@ -122,7 +122,11 @@ def from_table( # Scan all columns by default, we define this list as it can be pruned while preserving source_def scan_list = nodes.ScanList( tuple( - nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column) + nodes.ScanItem( + ids.ColumnId(bigframes.core.guid.generate_guid()), + item.dtype, + item.column, + ) for item in schema.items ) ) @@ -143,7 +147,7 @@ def from_table( @property def column_ids(self) -> typing.Sequence[str]: """Returns column ids as strings.""" - return self.schema.names + return [id_.name for id_ in self.node.ids] @property def session(self) -> Session: diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index 9731857ea0..f379c5fc1e 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -145,6 +145,26 @@ def label_to_identifier(label: typing.Hashable, strict: bool = False) -> str: elif identifier[0].isdigit(): # first character must be letter or underscore identifier = "_" + identifier + + # Except in special circumstances (true anonymous query results tables), + # field names are not allowed to start with these (case-insensitive) + # prefixes. + # _PARTITION, _TABLE_, _FILE_, _ROW_TIMESTAMP, __ROOT__ and _COLIDENTIFIER + if any( + identifier.casefold().startswith(invalid_prefix.casefold()) + for invalid_prefix in ( + "_PARTITION", + "_TABLE_", + "_FILE_", + "_ROW_TIMESTAMP", + "__ROOT__", + "_COLIDENTIFIER", + ) + ): + # Remove leading _ character(s) to avoid collisions with preserved + # prefixes. + identifier = re.sub("^_+", "", identifier) + return identifier diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 89375e063d..1d3c47a9bc 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -556,11 +556,11 @@ def read_gbq_table( index_cols = [sequential_index_col] index_names = [None] - value_columns = [col for col in array_value.column_ids if col not in index_cols] + value_names = [col for col in schema.names if col not in index_names] block = blocks.Block( array_value, index_columns=index_cols, - column_labels=value_columns, + column_labels=value_names, index_labels=index_names, ) if max_results: diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 192118ec92..91a752511b 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -5051,6 +5051,23 @@ def test_df_cached(scalars_df_index): pandas.testing.assert_frame_equal(df.to_pandas(), df_cached_copy.to_pandas()) +def test_df_cached_w_wildcard_table(session): + """Test the `cache()` API with a DataFrame that contains pseudocolumns from wildcard tables + + Regression test for internal issue b/405773140. + """ + df = session.read_gbq("bigquery-public-data.google_analytics_sample.ga_sessions_*") + df = ( + df[df["_TABLE_SUFFIX"] == "20161204"] + .groupby( + ["visitorId", "visitNumber", "visitId", "_TABLE_SUFFIX"], as_index=False + ) + .size() + ) + df_cached_copy = df.cache() + pandas.testing.assert_frame_equal(df.to_pandas(), df_cached_copy.to_pandas()) + + def test_assign_after_binop_row_joins(): pd_df = pd.DataFrame( { diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index e12db3f598..9996138a7d 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -656,6 +656,26 @@ def test_to_gbq_w_None_column_names( ) +def test_to_gbq_w_wildcard_table(session, dataset_id): + """Test the `to_gbq` API with a DataFrame that contains pseudocolumns from wildcard tables + + Regression test for internal issue b/405773140. + """ + destination_table = f"{dataset_id}.test_to_gbq_w_wildcard_table" + df = session.read_gbq("bigquery-public-data.google_analytics_sample.ga_sessions_*") + df = df[df["_TABLE_SUFFIX"] == "20161204"][ + ["visitorId", "visitNumber", "visitId", "_TABLE_SUFFIX"] + ] + df.to_gbq(destination_table, if_exists="replace") + + bf_result = bpd.read_gbq(destination_table) + pd.testing.assert_index_equal( + bf_result.columns, + # Remove leading _ to allow serialization. + pd.Index(["visitorId", "visitNumber", "visitId", "TABLE_SUFFIX"]), + ) + + @pytest.mark.parametrize( "clustering_columns", [ From 118964b84a72a031f24b758989e0eeaef4e1263f Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 09:57:05 -0500 Subject: [PATCH 05/18] rename columns before caching --- bigframes/core/array_value.py | 8 ++------ bigframes/core/tools/datetimes.py | 7 ++++++- bigframes/session/bq_caching_executor.py | 22 ++++++++++++++++++---- tests/system/small/test_dataframe.py | 8 ++++++-- tests/system/small/test_dataframe_io.py | 8 ++++++-- tests/system/small/test_series.py | 10 ++++++++++ 6 files changed, 48 insertions(+), 15 deletions(-) diff --git a/bigframes/core/array_value.py b/bigframes/core/array_value.py index fb57858803..eba63ad72e 100644 --- a/bigframes/core/array_value.py +++ b/bigframes/core/array_value.py @@ -122,11 +122,7 @@ def from_table( # Scan all columns by default, we define this list as it can be pruned while preserving source_def scan_list = nodes.ScanList( tuple( - nodes.ScanItem( - ids.ColumnId(bigframes.core.guid.generate_guid()), - item.dtype, - item.column, - ) + nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column) for item in schema.items ) ) @@ -147,7 +143,7 @@ def from_table( @property def column_ids(self) -> typing.Sequence[str]: """Returns column ids as strings.""" - return [id_.name for id_ in self.node.ids] + return self.schema.names @property def session(self) -> Session: diff --git a/bigframes/core/tools/datetimes.py b/bigframes/core/tools/datetimes.py index 2abb86a2f3..26afdc7910 100644 --- a/bigframes/core/tools/datetimes.py +++ b/bigframes/core/tools/datetimes.py @@ -52,7 +52,7 @@ def to_datetime( f"to datetime is not implemented. {constants.FEEDBACK_LINK}" ) - arg = bigframes.series.Series(arg)._cached() + arg = bigframes.series.Series(arg) if format and unit and arg.dtype in (bigframes.dtypes.INT_DTYPE, bigframes.dtypes.FLOAT_DTYPE): # type: ignore raise ValueError("cannot specify both format and unit") @@ -74,6 +74,11 @@ def to_datetime( ) assert unit is None + + # The following operations evaluate individual values to infer a format, + # so cache if needed. + arg = arg._cached(force=False) + as_datetime = arg._apply_unary_op( # type: ignore ops.ToDatetimeOp( format=format, diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 474b90835f..96e8bd55a3 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -32,6 +32,7 @@ import bigframes.core.nodes as nodes import bigframes.core.ordering as order import bigframes.core.tree_properties as tree_properties +import bigframes.core.utils as utils import bigframes.dtypes import bigframes.exceptions as bfe import bigframes.features @@ -374,6 +375,15 @@ def _cache_with_cluster_cols( ): """Executes the query and uses the resulting table to rewrite future executions.""" + # Pseudocolumns can be queried and even written to anonymous query + # results tables, but they can't be materialized to an explicit + # destination table. Therefore, we rename the columns to all be writable + # before executing the SQL. See: b/405773140 for discussion about the + # _TABLE_SUFFIX pseudocolumn. + prev_col_ids = array_value.column_ids + new_col_ids, _ = utils.get_standardized_ids(prev_col_ids) + array_value = array_value.rename_columns(dict(zip(prev_col_ids, new_col_ids))) + sql, schema, ordering_info = self.compiler.compile_raw( self.replace_cached_subtrees(array_value.node) ) @@ -382,10 +392,14 @@ def _cache_with_cluster_cols( schema, cluster_cols=bq_io.select_cluster_cols(schema, cluster_cols), ) - cached_replacement = array_value.as_cached( - cache_table=self.bqclient.get_table(tmp_table), - ordering=ordering_info, - ).node + cached_replacement = ( + array_value.as_cached( + cache_table=self.bqclient.get_table(tmp_table), + ordering=ordering_info, + ) + .rename_columns(dict(zip(new_col_ids, prev_col_ids))) + .node + ) self._cached_executions[array_value.node] = cached_replacement def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 91a752511b..aac975a12f 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -5051,12 +5051,16 @@ def test_df_cached(scalars_df_index): pandas.testing.assert_frame_equal(df.to_pandas(), df_cached_copy.to_pandas()) -def test_df_cached_w_wildcard_table(session): +def test_df_cached_w_wildcard_table(unordered_session): """Test the `cache()` API with a DataFrame that contains pseudocolumns from wildcard tables Regression test for internal issue b/405773140. + + Uses unordered_session to avoid full table scan. """ - df = session.read_gbq("bigquery-public-data.google_analytics_sample.ga_sessions_*") + df = unordered_session.read_gbq( + "bigquery-public-data.google_analytics_sample.ga_sessions_*" + ) df = ( df[df["_TABLE_SUFFIX"] == "20161204"] .groupby( diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index 9996138a7d..34fa9b4c78 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -656,13 +656,17 @@ def test_to_gbq_w_None_column_names( ) -def test_to_gbq_w_wildcard_table(session, dataset_id): +def test_to_gbq_w_wildcard_table(unordered_session, dataset_id): """Test the `to_gbq` API with a DataFrame that contains pseudocolumns from wildcard tables Regression test for internal issue b/405773140. + + Uses unordered_session to avoid full table scan. """ destination_table = f"{dataset_id}.test_to_gbq_w_wildcard_table" - df = session.read_gbq("bigquery-public-data.google_analytics_sample.ga_sessions_*") + df = unordered_session.read_gbq( + "bigquery-public-data.google_analytics_sample.ga_sessions_*" + ) df = df[df["_TABLE_SUFFIX"] == "20161204"][ ["visitorId", "visitNumber", "visitId", "_TABLE_SUFFIX"] ] diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index c63bf8e12b..27b9ba7f14 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -4143,6 +4143,16 @@ def test_apply_numpy_ufunc(scalars_dfs, ufunc): assert_series_equal(bf_result, pd_result) +def test_series_cached_w_wildcard_table(unordered_session): + """Test the `cache()` API with a Series that contains pseudocolumns from wildcard tables + + Regression test for internal issue b/405773140. + """ + df = unordered_session.read_gbq("bigquery-public-data.usa_names.usa_1910_20*") + series_cached_copy = df["_TABLE_SUFFIX"].cache() + assert len(series_cached_copy) == len(df) + + @pytest.mark.parametrize( ("ufunc",), [ From ca33463ed74777d475269a158adc02a2d6385678 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 09:59:17 -0500 Subject: [PATCH 06/18] remove unnecessary comment --- bigframes/session/bq_caching_executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 96e8bd55a3..459794f6a9 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -175,7 +175,6 @@ def export_gbq( # Only update schema if this is not modifying an existing table, and the # new table contains timedelta columns. table = self.bqclient.get_table(destination) - # TODO(tswast): What to do with pseudocolumns? table.schema = array_value.schema.to_bigquery() self.bqclient.update_table(table, ["schema"]) From 4897ca49f713b1e678ddb9d22202347c8541678a Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 10:08:28 -0500 Subject: [PATCH 07/18] add missing import --- bigframes/core/nodes.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 77f6565c91..09df62bd48 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -707,6 +707,8 @@ class GbqTable: @staticmethod def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: + import bigframes.core.tools.bigquery # Avoid circular imports. + # Subsetting fields with columns can reduce cost of row-hash default ordering table_schema = bigframes.core.tools.bigquery.get_schema_and_pseudocolumns(table) From e1a7341835bc5e80d081eee5c939e2e23b81f09a Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 11:28:00 -0500 Subject: [PATCH 08/18] do not materialize _TABLE_SUFFIX --- bigframes/session/_io/bigquery/__init__.py | 2 +- .../session/_io/bigquery/read_gbq_table.py | 2 +- bigframes/session/bq_caching_executor.py | 35 ++++++++++++++----- bigframes/session/loader.py | 22 ++++++++++-- tests/system/small/test_session.py | 3 +- 5 files changed, 50 insertions(+), 14 deletions(-) diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 08b05452a2..a6255dccaa 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -394,7 +394,7 @@ def to_query( select_clause = "SELECT *" if query_or_table.endswith("*"): - select_clause += ", _TABLE_SUFFIX" + select_clause += ", _TABLE_SUFFIX AS _BF_TABLE_SUFFIX" time_travel_clause = "" if time_travel_timestamp is not None: diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index a07a2508f3..fd77e4a016 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -146,7 +146,7 @@ def validate_table( sql_predicate=filter_str, time_travel_timestamp=None, ) - # Any erorrs here should just be raised to user + # Any errors here should just be raised to user bqclient.query_and_wait( snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True) ) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 459794f6a9..9687ef7329 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -282,7 +282,12 @@ def get_row_count(self, array_value: bigframes.core.ArrayValue) -> int: row_count_plan = self.replace_cached_subtrees( generate_row_count_plan(array_value.node) ) - results = self._execute_plan(row_count_plan, ordered=True) + results = self._execute_plan( + row_count_plan, + ordered=True, + # We only expect a single integer, so don't create a destination table. + destination=None, + ) pa_table = next(results.arrow_batches()) pa_array = pa_table.column(0) return pa_array.tolist()[0] @@ -381,10 +386,10 @@ def _cache_with_cluster_cols( # _TABLE_SUFFIX pseudocolumn. prev_col_ids = array_value.column_ids new_col_ids, _ = utils.get_standardized_ids(prev_col_ids) - array_value = array_value.rename_columns(dict(zip(prev_col_ids, new_col_ids))) + renamed = array_value.rename_columns(dict(zip(prev_col_ids, new_col_ids))) sql, schema, ordering_info = self.compiler.compile_raw( - self.replace_cached_subtrees(array_value.node) + self.replace_cached_subtrees(renamed.node) ) tmp_table = self._sql_as_cached_temp_table( sql, @@ -392,7 +397,7 @@ def _cache_with_cluster_cols( cluster_cols=bq_io.select_cluster_cols(schema, cluster_cols), ) cached_replacement = ( - array_value.as_cached( + renamed.as_cached( cache_table=self.bqclient.get_table(tmp_table), ordering=ordering_info, ) @@ -405,6 +410,16 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): """Executes the query and uses the resulting table to rewrite future executions.""" offset_column = bigframes.core.guid.generate_guid("bigframes_offsets") w_offsets, offset_column = array_value.promote_offsets() + + # Pseudocolumns can be queried and even written to anonymous query + # results tables, but they can't be materialized to an explicit + # destination table. Therefore, we rename the columns to all be writable + # before executing the SQL. See: b/405773140 for discussion about the + # _TABLE_SUFFIX pseudocolumn. + prev_col_ids = w_offsets.column_ids + new_col_ids, _ = utils.get_standardized_ids(prev_col_ids) + renamed = w_offsets.rename_columns(dict(zip(prev_col_ids, new_col_ids))) + sql = self.compiler.compile( self.replace_cached_subtrees(w_offsets.node), ordered=False ) @@ -414,10 +429,14 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): w_offsets.schema.to_bigquery(), cluster_cols=[offset_column], ) - cached_replacement = array_value.as_cached( - cache_table=self.bqclient.get_table(tmp_table), - ordering=order.TotalOrdering.from_offset_col(offset_column), - ).node + cached_replacement = ( + renamed.as_cached( + cache_table=self.bqclient.get_table(tmp_table), + ordering=order.TotalOrdering.from_offset_col(offset_column), + ) + .rename_columns(dict(zip(new_col_ids, prev_col_ids))) + .node + ) self._cached_executions[array_value.node] = cached_replacement def _cache_with_session_awareness( diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 1d3c47a9bc..c37fb454c6 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -452,20 +452,25 @@ def read_gbq_table( sql_predicate=bf_io_bigquery.compile_filters(filters) if filters else None, - max_results=max_results, # We're executing the query, so we don't need time travel for # determinism. time_travel_timestamp=None, ) - return self.read_gbq_query( + df = self.read_gbq_query( query, index_col=index_cols, columns=columns, api_name=api_name, use_cache=use_cache, + max_results=max_results, ) + # TODO(tswast): Handle pseudocolumns more generally. + if "_BF_TABLE_SUFFIX" in df.columns: + df = df.rename(columns={"_BF_TABLE_SUFFIX": "_TABLE_SUFFIX"}) + return df + # ----------------------------------------- # Validate table access and features # ----------------------------------------- @@ -685,6 +690,7 @@ def read_gbq_query( cluster_candidates=[], api_name=api_name, configuration=configuration, + max_results=max_results, ) if self._metrics is not None: @@ -723,6 +729,7 @@ def _query_to_destination( api_name: str, configuration: dict = {"query": {"useQueryCache": True}}, do_clustering=True, + max_results: Optional[int] = None, ) -> Tuple[Optional[bigquery.TableReference], bigquery.QueryJob]: # If a dry_run indicates this is not a query type job, then don't # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. @@ -731,7 +738,16 @@ def _query_to_destination( _, dry_run_job = self._start_query( query, job_config=dry_run_config, api_name=api_name ) - if dry_run_job.statement_type != "SELECT": + if ( + dry_run_job.statement_type != "SELECT" + # If max_results is set, we probably don't have > 10 GB of results, + # so avoid creating a destination table. This is necessary for + # handling max_results with wildcard tables, since those include a + # pseudocolumn that can only be materialized to anonymous results + # tables. See: b/405773140 for discussion about the _TABLE_SUFFIX + # pseudocolumn. + or max_results is not None + ): _, query_job = self._start_query(query, api_name=api_name) return query_job.destination, query_job diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 1b886fbc5a..0d84196dae 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -563,7 +563,8 @@ def test_read_gbq_wildcard( if columns else [ field.name - for field in table_metadata.schema + for field in list(table_metadata.schema) + + [bigquery.SchemaField("_TABLE_SUFFIX", "STRING")] if field.name not in index_col and field.name not in columns ] ) From af06200f19c7445cccf66391dd2fc0e804df5cb6 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 11:42:48 -0500 Subject: [PATCH 09/18] fix unit tests --- tests/unit/session/test_io_bigquery.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 225362d522..7f1eadf458 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -387,7 +387,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) ["col_a", "col_b"], [], None, # max_results - None, # time_travel_timestampe + None, # time_travel_timestamp "SELECT `col_a`, `col_b` FROM `test_table`", id="table-columns", ), @@ -396,7 +396,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], [("date_col", ">", "2022-10-20")], None, # max_results - None, # time_travel_timestampe + None, # time_travel_timestamp "SELECT * FROM `test_table` WHERE `date_col` > '2022-10-20'", id="table-filter", ), @@ -405,8 +405,8 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], [], None, # max_results - None, # time_travel_timestampe - "SELECT *, _TABLE_SUFFIX FROM `test_table*`", + None, # time_travel_timestamp + "SELECT *, _TABLE_SUFFIX AS _BF_TABLE_SUFFIX FROM `test_table*`", id="wildcard-no_params", ), pytest.param( @@ -414,8 +414,8 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], [("_TABLE_SUFFIX", ">", "2022-10-20")], None, # max_results - None, # time_travel_timestampe - "SELECT *, _TABLE_SUFFIX FROM `test_table*` WHERE `_TABLE_SUFFIX` > '2022-10-20'", + None, # time_travel_timestamp + "SELECT *, _TABLE_SUFFIX AS _BF_TABLE_SUFFIX FROM `test_table*` WHERE `_TABLE_SUFFIX` > '2022-10-20'", id="wildcard-filter", ), ], From f26574b6983b5847e429170ee3c27b9119f7b3ba Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 12:17:31 -0500 Subject: [PATCH 10/18] correct number of columns in cache with offsets --- bigframes/session/bq_caching_executor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 9687ef7329..78c0333d2a 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -435,6 +435,7 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): ordering=order.TotalOrdering.from_offset_col(offset_column), ) .rename_columns(dict(zip(new_col_ids, prev_col_ids))) + .drop_columns([offset_column]) .node ) self._cached_executions[array_value.node] = cached_replacement From ab0e50a1efc63a602f7283385ecab5dd9c20c033 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 12:33:28 -0500 Subject: [PATCH 11/18] fix formatting --- bigframes/session/bq_caching_executor.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 9fec651de8..14f194d6d5 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -397,9 +397,7 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): new_col_ids, _ = utils.get_standardized_ids(prev_col_ids) renamed = w_offsets.rename_columns(dict(zip(prev_col_ids, new_col_ids))) - sql = self.compiler.compile( - self.logical_plan(w_offsets.node), ordered=False - ) + sql = self.compiler.compile(self.logical_plan(w_offsets.node), ordered=False) tmp_table = self._sql_as_cached_temp_table( sql, From 89535e2c4cb7584ef9353b6fbea74db762b44c03 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 29 Apr 2025 17:34:06 +0000 Subject: [PATCH 12/18] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- bigframes/session/bq_caching_executor.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 9fec651de8..14f194d6d5 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -397,9 +397,7 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): new_col_ids, _ = utils.get_standardized_ids(prev_col_ids) renamed = w_offsets.rename_columns(dict(zip(prev_col_ids, new_col_ids))) - sql = self.compiler.compile( - self.logical_plan(w_offsets.node), ordered=False - ) + sql = self.compiler.compile(self.logical_plan(w_offsets.node), ordered=False) tmp_table = self._sql_as_cached_temp_table( sql, From d37bf5e5460bc6a679c5d8083174ba7c9272544c Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 14:55:29 -0500 Subject: [PATCH 13/18] revert datetime change, max_results change --- bigframes/core/compile/compiler.py | 4 ++-- bigframes/core/nodes.py | 2 ++ bigframes/core/tools/datetimes.py | 7 +------ bigframes/session/_io/bigquery/__init__.py | 3 ++- bigframes/session/loader.py | 14 +------------- 5 files changed, 8 insertions(+), 22 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 7de7277f57..69fa2472c1 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -218,8 +218,8 @@ def _table_to_ibis( if ( source.at_time is not None or source.sql_predicate is not None - # TODO(tswast): make a more general way to check if the table node has pseudocolumns. - or source.table.table_id.endswith("*") + # ibis.table is not aware of pseudocolumns, so we compile to SQL ourselves. + or source.table.has_pseudocolumns ): import bigframes.session._io.bigquery diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 45f605507c..68ea4b722b 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -704,6 +704,7 @@ class GbqTable: physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field() is_physically_stored: bool = dataclasses.field() cluster_cols: typing.Optional[Tuple[str, ...]] + has_pseudocolumns: bool @staticmethod def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: @@ -725,6 +726,7 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: cluster_cols=None if table.clustering_fields is None else tuple(table.clustering_fields), + has_pseudocolumns=len(table_schema) > len(table.schema), ) def get_table_ref(self) -> bq.TableReference: diff --git a/bigframes/core/tools/datetimes.py b/bigframes/core/tools/datetimes.py index 26afdc7910..2abb86a2f3 100644 --- a/bigframes/core/tools/datetimes.py +++ b/bigframes/core/tools/datetimes.py @@ -52,7 +52,7 @@ def to_datetime( f"to datetime is not implemented. {constants.FEEDBACK_LINK}" ) - arg = bigframes.series.Series(arg) + arg = bigframes.series.Series(arg)._cached() if format and unit and arg.dtype in (bigframes.dtypes.INT_DTYPE, bigframes.dtypes.FLOAT_DTYPE): # type: ignore raise ValueError("cannot specify both format and unit") @@ -74,11 +74,6 @@ def to_datetime( ) assert unit is None - - # The following operations evaluate individual values to infer a format, - # so cache if needed. - arg = arg._cached(force=False) - as_datetime = arg._apply_unary_op( # type: ignore ops.ToDatetimeOp( format=format, diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index a6255dccaa..b5d8ebb283 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -378,6 +378,7 @@ def to_query( sql_predicate: Optional[str], max_results: Optional[int] = None, time_travel_timestamp: Optional[datetime.datetime] = None, + # pseudocolumns: Iterable[str] = (), ) -> str: """Compile query_or_table with conditions(filters, wildcards) to query.""" sub_query = ( @@ -393,7 +394,7 @@ def to_query( else: select_clause = "SELECT *" - if query_or_table.endswith("*"): + if query_or_table.endswith("*"): # TODO: get pseudocolumns here select_clause += ", _TABLE_SUFFIX AS _BF_TABLE_SUFFIX" time_travel_clause = "" diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index c37fb454c6..d074447130 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -463,7 +463,6 @@ def read_gbq_table( columns=columns, api_name=api_name, use_cache=use_cache, - max_results=max_results, ) # TODO(tswast): Handle pseudocolumns more generally. @@ -690,7 +689,6 @@ def read_gbq_query( cluster_candidates=[], api_name=api_name, configuration=configuration, - max_results=max_results, ) if self._metrics is not None: @@ -729,7 +727,6 @@ def _query_to_destination( api_name: str, configuration: dict = {"query": {"useQueryCache": True}}, do_clustering=True, - max_results: Optional[int] = None, ) -> Tuple[Optional[bigquery.TableReference], bigquery.QueryJob]: # If a dry_run indicates this is not a query type job, then don't # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. @@ -738,16 +735,7 @@ def _query_to_destination( _, dry_run_job = self._start_query( query, job_config=dry_run_config, api_name=api_name ) - if ( - dry_run_job.statement_type != "SELECT" - # If max_results is set, we probably don't have > 10 GB of results, - # so avoid creating a destination table. This is necessary for - # handling max_results with wildcard tables, since those include a - # pseudocolumn that can only be materialized to anonymous results - # tables. See: b/405773140 for discussion about the _TABLE_SUFFIX - # pseudocolumn. - or max_results is not None - ): + if dry_run_job.statement_type != "SELECT": _, query_job = self._start_query(query, api_name=api_name) return query_job.destination, query_job From 4bf66b69eeba9f21b30fa322f83b90881c9aa3f3 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 17:08:41 -0500 Subject: [PATCH 14/18] add pseudocolumns to node --- bigframes/core/compile/compiler.py | 8 ++++++-- bigframes/core/nodes.py | 20 +++++++++++++++----- bigframes/core/schema.py | 8 +++++++- bigframes/core/tools/bigquery.py | 5 +++-- bigframes/session/_io/bigquery/__init__.py | 9 ++++++--- bigframes/session/loader.py | 16 +++++++++++++--- tests/unit/session/test_io_bigquery.py | 7 +++++++ 7 files changed, 57 insertions(+), 16 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 69fa2472c1..a82531a8ae 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -213,13 +213,13 @@ def _table_to_ibis( ) # Physical schema might include unused columns, unsupported datatypes like JSON physical_schema = ibis_bigquery.BigQuerySchema.to_ibis( - list(source.table.physical_schema) + list(source.table.physical_schema) + list(source.table.pseudocolumns) ) if ( source.at_time is not None or source.sql_predicate is not None # ibis.table is not aware of pseudocolumns, so we compile to SQL ourselves. - or source.table.has_pseudocolumns + or source.table.pseudocolumns ): import bigframes.session._io.bigquery @@ -228,6 +228,10 @@ def _table_to_ibis( columns=scan_cols, sql_predicate=source.sql_predicate, time_travel_timestamp=source.at_time, + # Need to include pseudocolumns in case we're doing a SELECT *, + # as those wouldn't normally be included. + # TODO(tswast): Is scan_cols every empty, where this would happen. + pseudocolumns=[field.name for field in source.table.pseudocolumns], ) return ibis_bigquery.Backend().sql(schema=physical_schema, query=sql) else: diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 68ea4b722b..bf0b8cea14 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -704,14 +704,14 @@ class GbqTable: physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field() is_physically_stored: bool = dataclasses.field() cluster_cols: typing.Optional[Tuple[str, ...]] - has_pseudocolumns: bool + pseudocolumns: Tuple[bq.SchemaField, ...] = dataclasses.field() @staticmethod def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: import bigframes.core.tools.bigquery # Avoid circular imports. # Subsetting fields with columns can reduce cost of row-hash default ordering - table_schema = bigframes.core.tools.bigquery.get_schema_and_pseudocolumns(table) + table_schema = table.schema if columns: schema = tuple(item for item in table_schema if item.name in columns) @@ -726,7 +726,7 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: cluster_cols=None if table.clustering_fields is None else tuple(table.clustering_fields), - has_pseudocolumns=len(table_schema) > len(table.schema), + pseudocolumns=tuple(bigframes.core.tools.bigquery.get_pseudocolumns(table)), ) def get_table_ref(self) -> bq.TableReference: @@ -737,7 +737,10 @@ def get_table_ref(self) -> bq.TableReference: @property @functools.cache def schema_by_id(self): - return {col.name: col for col in self.physical_schema} + return { + col.name: col + for col in itertools.chain(self.physical_schema, self.pseudocolumns) + } @dataclasses.dataclass(frozen=True) @@ -768,7 +771,14 @@ class ReadTableNode(LeafNode): def _validate(self): # enforce invariants - physical_names = set(map(lambda i: i.name, self.source.table.physical_schema)) + physical_names = set( + map( + lambda i: i.name, + itertools.chain( + self.source.table.physical_schema, self.source.table.pseudocolumns + ), + ) + ) if not set(scan.source_id for scan in self.scan_list.items).issubset( physical_names ): diff --git a/bigframes/core/schema.py b/bigframes/core/schema.py index 6243ed86c8..eca124e424 100644 --- a/bigframes/core/schema.py +++ b/bigframes/core/schema.py @@ -16,6 +16,7 @@ from dataclasses import dataclass import functools +import itertools import typing from typing import Sequence @@ -56,7 +57,12 @@ def from_bq_table( items = [ SchemaItem(name, column_type_overrides.get(name, dtype)) for name, dtype in bigframes.dtypes.bf_type_from_type_kind( - bigframes.core.tools.bigquery.get_schema_and_pseudocolumns(table) + list( + itertools.chain( + table.schema, + bigframes.core.tools.bigquery.get_pseudocolumns(table), + ) + ) ).items() ] diff --git a/bigframes/core/tools/bigquery.py b/bigframes/core/tools/bigquery.py index 13853a3ada..249194215c 100644 --- a/bigframes/core/tools/bigquery.py +++ b/bigframes/core/tools/bigquery.py @@ -21,10 +21,11 @@ import google.cloud.bigquery as bigquery -def get_schema_and_pseudocolumns( +def get_pseudocolumns( table: bigquery.table.Table, ) -> list[bigquery.SchemaField]: - fields = list(table.schema) + """Which pseudocolumns are available for this table?""" + fields = [] # TODO(tswast): Add _PARTITIONTIME and/or _PARTIONDATE for injestion # time partitioned tables. diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index b5d8ebb283..f90d30811e 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -378,7 +378,7 @@ def to_query( sql_predicate: Optional[str], max_results: Optional[int] = None, time_travel_timestamp: Optional[datetime.datetime] = None, - # pseudocolumns: Iterable[str] = (), + pseudocolumns: Iterable[str] = (), ) -> str: """Compile query_or_table with conditions(filters, wildcards) to query.""" sub_query = ( @@ -394,8 +394,11 @@ def to_query( else: select_clause = "SELECT *" - if query_or_table.endswith("*"): # TODO: get pseudocolumns here - select_clause += ", _TABLE_SUFFIX AS _BF_TABLE_SUFFIX" + if pseudocolumns: + pseudo_sql = ", ".join( + f"{column} AS _BF_{column}" for column in pseudocolumns + ) + select_clause += f", {pseudo_sql}" time_travel_clause = "" if time_travel_timestamp is not None: diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index d074447130..5034abadb0 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -363,6 +363,7 @@ def read_gbq_table( enable_snapshot: bool = True, ) -> dataframe.DataFrame: import bigframes._tools.strings + import bigframes.core.tools.bigquery import bigframes.dataframe as dataframe # --------------------------------- @@ -446,15 +447,18 @@ def read_gbq_table( all_columns: Iterable[str] = ( itertools.chain(index_cols, columns) if columns else () ) + pseudocolumns = bigframes.core.tools.bigquery.get_pseudocolumns(table) query = bf_io_bigquery.to_query( query, columns=all_columns, sql_predicate=bf_io_bigquery.compile_filters(filters) if filters else None, + max_results=max_results, # We're executing the query, so we don't need time travel for # determinism. time_travel_timestamp=None, + pseudocolumns=[field.name for field in pseudocolumns], ) df = self.read_gbq_query( @@ -465,9 +469,15 @@ def read_gbq_table( use_cache=use_cache, ) - # TODO(tswast): Handle pseudocolumns more generally. - if "_BF_TABLE_SUFFIX" in df.columns: - df = df.rename(columns={"_BF_TABLE_SUFFIX": "_TABLE_SUFFIX"}) + if pseudocolumns: + df = df.rename( + columns=dict( + zip( + [f"_BF_{field.name}" for field in pseudocolumns], + [field.name for field in pseudocolumns], + ) + ) + ) return df # ----------------------------------------- diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 7f1eadf458..0de153a571 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -331,6 +331,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) "filters", "max_results", "time_travel_timestamp", + "pseudocolumns", "expected_output", ), [ @@ -345,6 +346,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) datetime.datetime( 2024, 5, 14, 12, 42, 36, 125125, tzinfo=datetime.timezone.utc ), + [], # pseudocolumns ( "SELECT `row_index`, `string_col` FROM `test_table` " "FOR SYSTEM_TIME AS OF TIMESTAMP('2024-05-14T12:42:36.125125+00:00') " @@ -370,6 +372,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) datetime.datetime( 2024, 5, 14, 12, 42, 36, 125125, tzinfo=datetime.timezone.utc ), + [], # pseudocolumns ( """SELECT `rowindex`, `string_col` FROM (SELECT rowindex, @@ -388,6 +391,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], None, # max_results None, # time_travel_timestamp + [], # pseudocolumns "SELECT `col_a`, `col_b` FROM `test_table`", id="table-columns", ), @@ -397,6 +401,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [("date_col", ">", "2022-10-20")], None, # max_results None, # time_travel_timestamp + [], # pseudocolumns "SELECT * FROM `test_table` WHERE `date_col` > '2022-10-20'", id="table-filter", ), @@ -406,6 +411,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], None, # max_results None, # time_travel_timestamp + ["_TABLE_SUFFIX"], # pseudocolumns "SELECT *, _TABLE_SUFFIX AS _BF_TABLE_SUFFIX FROM `test_table*`", id="wildcard-no_params", ), @@ -415,6 +421,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [("_TABLE_SUFFIX", ">", "2022-10-20")], None, # max_results None, # time_travel_timestamp + ["_TABLE_SUFFIX"], # pseudocolumns "SELECT *, _TABLE_SUFFIX AS _BF_TABLE_SUFFIX FROM `test_table*` WHERE `_TABLE_SUFFIX` > '2022-10-20'", id="wildcard-filter", ), From 8c964982dadd02fdbb27ad9ea08031394f6c3cb6 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 17:10:04 -0500 Subject: [PATCH 15/18] fix unit tests --- tests/unit/session/test_io_bigquery.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 0de153a571..56a44f0b86 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -433,6 +433,7 @@ def test_to_query( filters, max_results, time_travel_timestamp, + pseudocolumns, expected_output, ): query = io_bq.to_query( @@ -441,6 +442,7 @@ def test_to_query( sql_predicate=io_bq.compile_filters(filters), max_results=max_results, time_travel_timestamp=time_travel_timestamp, + pseudocolumns=pseudocolumns, ) assert query == expected_output From e1780a64ef8ffd11917734c1a7f36c8a9843b3a7 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 17:11:53 -0500 Subject: [PATCH 16/18] actually fix unit tests --- tests/unit/session/test_io_bigquery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 56a44f0b86..54d849a106 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -412,7 +412,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) None, # max_results None, # time_travel_timestamp ["_TABLE_SUFFIX"], # pseudocolumns - "SELECT *, _TABLE_SUFFIX AS _BF_TABLE_SUFFIX FROM `test_table*`", + "SELECT *, _TABLE_SUFFIX AS _BF__TABLE_SUFFIX FROM `test_table*`", id="wildcard-no_params", ), pytest.param( @@ -422,7 +422,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) None, # max_results None, # time_travel_timestamp ["_TABLE_SUFFIX"], # pseudocolumns - "SELECT *, _TABLE_SUFFIX AS _BF_TABLE_SUFFIX FROM `test_table*` WHERE `_TABLE_SUFFIX` > '2022-10-20'", + "SELECT *, _TABLE_SUFFIX AS _BF__TABLE_SUFFIX FROM `test_table*` WHERE `_TABLE_SUFFIX` > '2022-10-20'", id="wildcard-filter", ), ], From b027b5112866e9de61a2bf41a92f043d0d26dbe9 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 29 Apr 2025 17:36:48 -0500 Subject: [PATCH 17/18] try to rename as part of compile --- bigframes/core/compile/compiler.py | 21 ++++++++++- bigframes/session/bq_caching_executor.py | 47 +++++------------------- 2 files changed, 28 insertions(+), 40 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index a82531a8ae..9914935b47 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -25,7 +25,7 @@ import pyarrow as pa from bigframes import dtypes, operations -from bigframes.core import expression +from bigframes.core import expression, identifiers import bigframes.core.compile.compiled as compiled import bigframes.core.compile.concat as concat_impl import bigframes.core.compile.configs as configs @@ -34,6 +34,7 @@ import bigframes.core.nodes as nodes import bigframes.core.ordering as bf_ordering import bigframes.core.rewrite as rewrites +import bigframes.core.utils as utils if typing.TYPE_CHECKING: import bigframes.core @@ -73,7 +74,23 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult: ordering if ordering.referenced_columns.issubset(result_node.ids) else None ) assert (not request.materialize_all_order_keys) or (output_order is not None) - return configs.CompileResult(sql, result_node.schema.to_bigquery(), output_order) + + # Pseudocolumns can be queried and even written to anonymous query + # results tables, but they can't be materialized to an explicit + # destination table. Therefore, we rename the columns to all be writable + # before executing the SQL. See: b/405773140 for discussion about the + # _TABLE_SUFFIX pseudocolumn. + prev_col_ids = result_node.schema.names + new_col_ids, _ = utils.get_standardized_ids(prev_col_ids) + renamed_node = result_node.remap_refs( + dict( + zip( + (identifiers.ColumnId(name=name) for name in prev_col_ids), + (identifiers.ColumnId(name=name) for name in new_col_ids), + ) + ) + ) + return configs.CompileResult(sql, renamed_node.schema.to_bigquery(), output_order) def _replace_unsupported_ops(node: nodes.BigFrameNode): diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 14f194d6d5..dd0ac52bcb 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -33,7 +33,6 @@ import bigframes.core.nodes as nodes import bigframes.core.ordering as order import bigframes.core.tree_properties as tree_properties -import bigframes.core.utils as utils import bigframes.dtypes import bigframes.exceptions as bfe import bigframes.features @@ -355,32 +354,18 @@ def _cache_with_cluster_cols( self, array_value: bigframes.core.ArrayValue, cluster_cols: Sequence[str] ): """Executes the query and uses the resulting table to rewrite future executions.""" - - # Pseudocolumns can be queried and even written to anonymous query - # results tables, but they can't be materialized to an explicit - # destination table. Therefore, we rename the columns to all be writable - # before executing the SQL. See: b/405773140 for discussion about the - # _TABLE_SUFFIX pseudocolumn. - prev_col_ids = array_value.column_ids - new_col_ids, _ = utils.get_standardized_ids(prev_col_ids) - renamed = array_value.rename_columns(dict(zip(prev_col_ids, new_col_ids))) - sql, schema, ordering_info = self.compiler.compile_raw( - self.logical_plan(renamed.node) + self.logical_plan(array_value.node) ) tmp_table = self._sql_as_cached_temp_table( sql, schema, cluster_cols=bq_io.select_cluster_cols(schema, cluster_cols), ) - cached_replacement = ( - renamed.as_cached( - cache_table=self.bqclient.get_table(tmp_table), - ordering=ordering_info, - ) - .rename_columns(dict(zip(new_col_ids, prev_col_ids))) - .node - ) + cached_replacement = array_value.as_cached( + cache_table=self.bqclient.get_table(tmp_table), + ordering=ordering_info, + ).node self._cached_executions[array_value.node] = cached_replacement def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): @@ -388,15 +373,6 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): offset_column = bigframes.core.guid.generate_guid("bigframes_offsets") w_offsets, offset_column = array_value.promote_offsets() - # Pseudocolumns can be queried and even written to anonymous query - # results tables, but they can't be materialized to an explicit - # destination table. Therefore, we rename the columns to all be writable - # before executing the SQL. See: b/405773140 for discussion about the - # _TABLE_SUFFIX pseudocolumn. - prev_col_ids = w_offsets.column_ids - new_col_ids, _ = utils.get_standardized_ids(prev_col_ids) - renamed = w_offsets.rename_columns(dict(zip(prev_col_ids, new_col_ids))) - sql = self.compiler.compile(self.logical_plan(w_offsets.node), ordered=False) tmp_table = self._sql_as_cached_temp_table( @@ -404,15 +380,10 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): w_offsets.schema.to_bigquery(), cluster_cols=[offset_column], ) - cached_replacement = ( - renamed.as_cached( - cache_table=self.bqclient.get_table(tmp_table), - ordering=order.TotalOrdering.from_offset_col(offset_column), - ) - .rename_columns(dict(zip(new_col_ids, prev_col_ids))) - .drop_columns([offset_column]) - .node - ) + cached_replacement = array_value.as_cached( + cache_table=self.bqclient.get_table(tmp_table), + ordering=order.TotalOrdering.from_offset_col(offset_column), + ).node self._cached_executions[array_value.node] = cached_replacement def _cache_with_session_awareness( From 9a778dbf07d58e0ca97a9d5db356f858cb452caf Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Wed, 30 Apr 2025 09:25:37 -0500 Subject: [PATCH 18/18] use correct node for table schema --- bigframes/session/bq_caching_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 61a0728c6c..3c752527fe 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -390,7 +390,7 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): tmp_table = self._sql_as_cached_temp_table( sql, - w_offsets.schema.to_bigquery(), + renamed.schema.to_bigquery(), cluster_cols=[offset_column], ) cached_replacement = array_value.as_cached(