diff --git a/bigframes/core/array_value.py b/bigframes/core/array_value.py index 41a6882cd7..f08d51d193 100644 --- a/bigframes/core/array_value.py +++ b/bigframes/core/array_value.py @@ -17,7 +17,7 @@ import datetime import functools import typing -from typing import Iterable, List, Mapping, Optional, Sequence, Tuple +from typing import Dict, Iterable, List, Mapping, Optional, Sequence, Tuple import warnings import google.cloud.bigquery @@ -176,10 +176,15 @@ def as_cached( self: ArrayValue, cache_table: google.cloud.bigquery.Table, ordering: Optional[orderings.RowOrdering], + *, + renames: Optional[Dict[str, str]] = None, ) -> ArrayValue: """ Replace the node with an equivalent one that references a table where the value has been materialized to. """ + if renames is None: + renames = {} + table = nodes.GbqTable.from_table(cache_table) source = nodes.BigqueryDataSource( table, ordering=ordering, n_rows=cache_table.num_rows @@ -187,7 +192,11 @@ def as_cached( # Assumption: GBQ cached table uses field name as bq column name scan_list = nodes.ScanList( tuple( - nodes.ScanItem(field.id, field.dtype, field.id.name) + nodes.ScanItem( + field.id, + field.dtype, + renames.get(field.id.name, field.id.name), + ) for field in self.node.fields ) ) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index fb5399b7cb..9ba7b65ea5 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -213,9 +213,14 @@ def _table_to_ibis( ) # Physical schema might include unused columns, unsupported datatypes like JSON physical_schema = ibis_bigquery.BigQuerySchema.to_ibis( - list(source.table.physical_schema) + list(source.table.physical_schema) + list(source.table.pseudocolumns) ) - if source.at_time is not None or source.sql_predicate is not None: + if ( + source.at_time is not None + or source.sql_predicate is not None + # ibis.table is not aware of pseudocolumns, so we compile to SQL ourselves. + or source.table.pseudocolumns + ): import bigframes.session._io.bigquery sql = bigframes.session._io.bigquery.to_query( @@ -223,6 +228,10 @@ def _table_to_ibis( columns=scan_cols, sql_predicate=source.sql_predicate, time_travel_timestamp=source.at_time, + # Need to include pseudocolumns in case we're doing a SELECT *, + # as those wouldn't normally be included. + # TODO(tswast): Is scan_cols ever empty? If not, this might not be necessary. + pseudocolumns=[field.name for field in source.table.pseudocolumns], ) return ibis_bigquery.Backend().sql(schema=physical_schema, query=sql) else: diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 0fbfe7bd37..bf0b8cea14 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -704,14 +704,19 @@ class GbqTable: physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field() is_physically_stored: bool = dataclasses.field() cluster_cols: typing.Optional[Tuple[str, ...]] + pseudocolumns: Tuple[bq.SchemaField, ...] = dataclasses.field() @staticmethod def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: + import bigframes.core.tools.bigquery # Avoid circular imports. + # Subsetting fields with columns can reduce cost of row-hash default ordering + table_schema = table.schema + if columns: - schema = tuple(item for item in table.schema if item.name in columns) + schema = tuple(item for item in table_schema if item.name in columns) else: - schema = tuple(table.schema) + schema = tuple(table_schema) return GbqTable( project_id=table.project, dataset_id=table.dataset_id, @@ -721,6 +726,7 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: cluster_cols=None if table.clustering_fields is None else tuple(table.clustering_fields), + pseudocolumns=tuple(bigframes.core.tools.bigquery.get_pseudocolumns(table)), ) def get_table_ref(self) -> bq.TableReference: @@ -731,7 +737,10 @@ def get_table_ref(self) -> bq.TableReference: @property @functools.cache def schema_by_id(self): - return {col.name: col for col in self.physical_schema} + return { + col.name: col + for col in itertools.chain(self.physical_schema, self.pseudocolumns) + } @dataclasses.dataclass(frozen=True) @@ -762,7 +771,14 @@ class ReadTableNode(LeafNode): def _validate(self): # enforce invariants - physical_names = set(map(lambda i: i.name, self.source.table.physical_schema)) + physical_names = set( + map( + lambda i: i.name, + itertools.chain( + self.source.table.physical_schema, self.source.table.pseudocolumns + ), + ) + ) if not set(scan.source_id for scan in self.scan_list.items).issubset( physical_names ): diff --git a/bigframes/core/schema.py b/bigframes/core/schema.py index 4f636ab210..eca124e424 100644 --- a/bigframes/core/schema.py +++ b/bigframes/core/schema.py @@ -16,6 +16,7 @@ from dataclasses import dataclass import functools +import itertools import typing from typing import Sequence @@ -48,15 +49,24 @@ def from_bq_table( typing.Dict[str, bigframes.dtypes.Dtype] ] = None, ): + # Avoid circular imports. + import bigframes.core.tools.bigquery + if column_type_overrides is None: column_type_overrides = {} - items = tuple( + items = [ SchemaItem(name, column_type_overrides.get(name, dtype)) for name, dtype in bigframes.dtypes.bf_type_from_type_kind( - table.schema + list( + itertools.chain( + table.schema, + bigframes.core.tools.bigquery.get_pseudocolumns(table), + ) + ) ).items() - ) - return ArraySchema(items) + ] + + return ArraySchema(tuple(items)) @property def names(self) -> typing.Tuple[str, ...]: diff --git a/bigframes/core/tools/bigquery.py b/bigframes/core/tools/bigquery.py new file mode 100644 index 0000000000..249194215c --- /dev/null +++ b/bigframes/core/tools/bigquery.py @@ -0,0 +1,40 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Private helpers for loading a BigQuery table as a BigQuery DataFrames DataFrame. +""" + +from __future__ import annotations + +import google.cloud.bigquery as bigquery + + +def get_pseudocolumns( + table: bigquery.table.Table, +) -> list[bigquery.SchemaField]: + """Which pseudocolumns are available for this table?""" + fields = [] + + # TODO(tswast): Add _PARTITIONTIME and/or _PARTIONDATE for injestion + # time partitioned tables. + if table.table_id.endswith("*"): + fields.append( + bigquery.SchemaField( + "_TABLE_SUFFIX", + "STRING", + ) + ) + + return fields diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index 495523d2fc..ba3fdcfd4b 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -147,6 +147,26 @@ def label_to_identifier(label: typing.Hashable, strict: bool = False) -> str: elif identifier[0].isdigit(): # first character must be letter or underscore identifier = "_" + identifier + + # Except in special circumstances (true anonymous query results tables), + # field names are not allowed to start with these (case-insensitive) + # prefixes. + # _PARTITION, _TABLE_, _FILE_, _ROW_TIMESTAMP, __ROOT__ and _COLIDENTIFIER + if any( + identifier.casefold().startswith(invalid_prefix.casefold()) + for invalid_prefix in ( + "_PARTITION", + "_TABLE_", + "_FILE_", + "_ROW_TIMESTAMP", + "__ROOT__", + "_COLIDENTIFIER", + ) + ): + # Remove leading _ character(s) to avoid collisions with preserved + # prefixes. + identifier = re.sub("^_+", "", identifier) + return identifier diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 6df9424e3b..85882b3497 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -377,6 +377,7 @@ def to_query( sql_predicate: Optional[str], max_results: Optional[int] = None, time_travel_timestamp: Optional[datetime.datetime] = None, + pseudocolumns: Iterable[str] = (), ) -> str: """Compile query_or_table with conditions(filters, wildcards) to query.""" sub_query = ( @@ -392,6 +393,12 @@ def to_query( else: select_clause = "SELECT *" + if pseudocolumns: + pseudo_sql = ", ".join( + f"{column} AS _BF_{column}" for column in pseudocolumns + ) + select_clause += f", {pseudo_sql}" + time_travel_clause = "" if time_travel_timestamp is not None: time_travel_literal = bigframes.core.sql.simple_literal(time_travel_timestamp) diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 8d8f247185..6b1cb99c65 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -101,7 +101,16 @@ def validate_table( # Anonymous dataset, does not support snapshot ever if table.dataset_id.startswith("_"): pass + # Only true tables support time travel + elif table.table_id.endswith("*"): + msg = bfe.format_message( + "Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. " + "Attempting query without time travel. Be aware that " + "modifications to the underlying data may result in errors or " + "unexpected behavior." + ) + warnings.warn(msg, category=bfe.TimeTravelDisabledWarning) elif table.table_type != "TABLE": if table.table_type == "MATERIALIZED_VIEW": msg = bfe.format_message( @@ -137,7 +146,7 @@ def validate_table( sql_predicate=filter_str, time_travel_timestamp=None, ) - # Any erorrs here should just be raised to user + # Any errors here should just be raised to user bqclient.query_and_wait( snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True) ) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 4c10d76253..1dfc3c0144 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -33,6 +33,7 @@ import bigframes.core.nodes as nodes import bigframes.core.ordering as order import bigframes.core.tree_properties as tree_properties +import bigframes.core.utils as utils import bigframes.dtypes import bigframes.exceptions as bfe import bigframes.features @@ -346,8 +347,13 @@ def _cache_with_cluster_cols( ): """Executes the query and uses the resulting table to rewrite future executions.""" + prev_col_ids = array_value.column_ids + new_col_ids, _ = utils.get_standardized_ids(prev_col_ids) + renames = dict(zip(prev_col_ids, new_col_ids)) + renamed = array_value.rename_columns(renames) + sql, schema, ordering_info = self.compiler.compile_raw( - self.logical_plan(array_value.node) + self.logical_plan(renamed.node) ) tmp_table = self._sql_as_cached_temp_table( sql, @@ -357,6 +363,7 @@ def _cache_with_cluster_cols( cached_replacement = array_value.as_cached( cache_table=self.bqclient.get_table(tmp_table), ordering=ordering_info, + renames=renames, ).node self._cached_executions[array_value.node] = cached_replacement @@ -364,16 +371,23 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): """Executes the query and uses the resulting table to rewrite future executions.""" offset_column = bigframes.core.guid.generate_guid("bigframes_offsets") w_offsets, offset_column = array_value.promote_offsets() - sql = self.compiler.compile(self.logical_plan(w_offsets.node), ordered=False) + + prev_col_ids = w_offsets.column_ids + new_col_ids, _ = utils.get_standardized_ids(prev_col_ids) + renames = dict(zip(prev_col_ids, new_col_ids)) + renamed = w_offsets.rename_columns(renames) + + sql = self.compiler.compile(self.logical_plan(renamed.node), ordered=False) tmp_table = self._sql_as_cached_temp_table( sql, - w_offsets.schema.to_bigquery(), + renamed.schema.to_bigquery(), cluster_cols=[offset_column], ) cached_replacement = array_value.as_cached( cache_table=self.bqclient.get_table(tmp_table), ordering=order.TotalOrdering.from_offset_col(offset_column), + renames=renames, ).node self._cached_executions[array_value.node] = cached_replacement diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index f748f0fd76..5f1d93cc02 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -22,6 +22,7 @@ import os import typing from typing import ( + cast, Dict, Generator, Hashable, @@ -48,6 +49,7 @@ import bigframes.core as core import bigframes.core.blocks as blocks import bigframes.core.schema as schemata +import bigframes.core.tools.bigquery import bigframes.dtypes import bigframes.formatting_helpers as formatting_helpers from bigframes.session import dry_runs @@ -518,16 +520,13 @@ def read_gbq_table( # clustered tables, so fallback to a query. We do this here so that # the index is consistent with tables that have primary keys, even # when max_results is set. - # TODO(b/338419730): We don't need to fallback to a query for wildcard - # tables if we allow some non-determinism when time travel isn't supported. - if max_results is not None or bf_io_bigquery.is_table_with_wildcard_suffix( - table_id - ): + if max_results is not None: # TODO(b/338111344): If we are running a query anyway, we might as # well generate ROW_NUMBER() at the same time. all_columns: Iterable[str] = ( itertools.chain(index_cols, columns) if columns else () ) + pseudocolumns = bigframes.core.tools.bigquery.get_pseudocolumns(table) query = bf_io_bigquery.to_query( table_id, columns=all_columns, @@ -538,9 +537,10 @@ def read_gbq_table( # We're executing the query, so we don't need time travel for # determinism. time_travel_timestamp=None, + pseudocolumns=[field.name for field in pseudocolumns], ) - return self.read_gbq_query( # type: ignore # for dry_run overload + df = self.read_gbq_query( # type: ignore # for dry_run overload query, index_col=index_cols, columns=columns, @@ -549,6 +549,17 @@ def read_gbq_table( dry_run=dry_run, ) + if pseudocolumns and not dry_run: + df = cast(dataframe.DataFrame, df,).rename( + columns=dict( + zip( + [f"_BF_{field.name}" for field in pseudocolumns], + [field.name for field in pseudocolumns], + ) + ) + ) + return df + if dry_run: return dry_runs.get_table_stats(table) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 362d736aeb..fad43beee5 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -14,6 +14,7 @@ import io import operator +import re import sys import tempfile import typing @@ -5065,6 +5066,27 @@ def test_df_cached(scalars_df_index): pandas.testing.assert_frame_equal(df.to_pandas(), df_cached_copy.to_pandas()) +def test_df_cached_w_wildcard_table(unordered_session): + """Test the `cache()` API with a DataFrame that contains pseudocolumns from wildcard tables + + Regression test for internal issue b/405773140. + + Uses unordered_session to avoid full table scan. + """ + df = unordered_session.read_gbq( + "bigquery-public-data.google_analytics_sample.ga_sessions_*" + ) + df = ( + df[df["_TABLE_SUFFIX"] == "20161204"] + .groupby( + ["visitorId", "visitNumber", "visitId", "_TABLE_SUFFIX"], as_index=False + ) + .size() + ) + df_cached_copy = df.cache() + pandas.testing.assert_frame_equal(df.to_pandas(), df_cached_copy.to_pandas()) + + def test_assign_after_binop_row_joins(): pd_df = pd.DataFrame( { @@ -5299,6 +5321,19 @@ def test_to_gbq_and_create_dataset(session, scalars_df_index, dataset_id_not_cre assert not loaded_scalars_df_index.empty +def test_read_gbq_to_pandas_wildcard(unordered_session: bigframes.Session): + with pytest.warns( + bigframes.exceptions.TimeTravelDisabledWarning, + match=re.escape("Wildcard tables do not support FOR SYSTEM_TIME"), + ): + df = unordered_session.read_gbq("bigquery-public-data.noaa_gsod.gsod*") + df = df[df["_TABLE_SUFFIX"] == "1929"][["da", "mo", "year", "max"]] + df.to_pandas() + rows, columns = df.shape + assert rows > 0 + assert columns == 4 + + def test_read_gbq_to_pandas_no_exec(unordered_session: bigframes.Session): metrics = unordered_session._metrics execs_pre = metrics.execution_count diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index e12db3f598..34fa9b4c78 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -656,6 +656,30 @@ def test_to_gbq_w_None_column_names( ) +def test_to_gbq_w_wildcard_table(unordered_session, dataset_id): + """Test the `to_gbq` API with a DataFrame that contains pseudocolumns from wildcard tables + + Regression test for internal issue b/405773140. + + Uses unordered_session to avoid full table scan. + """ + destination_table = f"{dataset_id}.test_to_gbq_w_wildcard_table" + df = unordered_session.read_gbq( + "bigquery-public-data.google_analytics_sample.ga_sessions_*" + ) + df = df[df["_TABLE_SUFFIX"] == "20161204"][ + ["visitorId", "visitNumber", "visitId", "_TABLE_SUFFIX"] + ] + df.to_gbq(destination_table, if_exists="replace") + + bf_result = bpd.read_gbq(destination_table) + pd.testing.assert_index_equal( + bf_result.columns, + # Remove leading _ to allow serialization. + pd.Index(["visitorId", "visitNumber", "visitId", "TABLE_SUFFIX"]), + ) + + @pytest.mark.parametrize( "clustering_columns", [ diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index c63bf8e12b..27b9ba7f14 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -4143,6 +4143,16 @@ def test_apply_numpy_ufunc(scalars_dfs, ufunc): assert_series_equal(bf_result, pd_result) +def test_series_cached_w_wildcard_table(unordered_session): + """Test the `cache()` API with a Series that contains pseudocolumns from wildcard tables + + Regression test for internal issue b/405773140. + """ + df = unordered_session.read_gbq("bigquery-public-data.usa_names.usa_1910_20*") + series_cached_copy = df["_TABLE_SUFFIX"].cache() + assert len(series_cached_copy) == len(df) + + @pytest.mark.parametrize( ("ufunc",), [ diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index ad01a95509..e231b309cd 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -575,7 +575,8 @@ def test_read_gbq_wildcard( if columns else [ field.name - for field in table_metadata.schema + for field in list(table_metadata.schema) + + [bigquery.SchemaField("_TABLE_SUFFIX", "STRING")] if field.name not in index_col and field.name not in columns ] ) diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index af2c7714ab..54d849a106 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -331,6 +331,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) "filters", "max_results", "time_travel_timestamp", + "pseudocolumns", "expected_output", ), [ @@ -345,6 +346,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) datetime.datetime( 2024, 5, 14, 12, 42, 36, 125125, tzinfo=datetime.timezone.utc ), + [], # pseudocolumns ( "SELECT `row_index`, `string_col` FROM `test_table` " "FOR SYSTEM_TIME AS OF TIMESTAMP('2024-05-14T12:42:36.125125+00:00') " @@ -370,6 +372,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) datetime.datetime( 2024, 5, 14, 12, 42, 36, 125125, tzinfo=datetime.timezone.utc ), + [], # pseudocolumns ( """SELECT `rowindex`, `string_col` FROM (SELECT rowindex, @@ -387,7 +390,8 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) ["col_a", "col_b"], [], None, # max_results - None, # time_travel_timestampe + None, # time_travel_timestamp + [], # pseudocolumns "SELECT `col_a`, `col_b` FROM `test_table`", id="table-columns", ), @@ -396,7 +400,8 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], [("date_col", ">", "2022-10-20")], None, # max_results - None, # time_travel_timestampe + None, # time_travel_timestamp + [], # pseudocolumns "SELECT * FROM `test_table` WHERE `date_col` > '2022-10-20'", id="table-filter", ), @@ -405,8 +410,9 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], [], None, # max_results - None, # time_travel_timestampe - "SELECT * FROM `test_table*`", + None, # time_travel_timestamp + ["_TABLE_SUFFIX"], # pseudocolumns + "SELECT *, _TABLE_SUFFIX AS _BF__TABLE_SUFFIX FROM `test_table*`", id="wildcard-no_params", ), pytest.param( @@ -414,8 +420,9 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], [("_TABLE_SUFFIX", ">", "2022-10-20")], None, # max_results - None, # time_travel_timestampe - "SELECT * FROM `test_table*` WHERE `_TABLE_SUFFIX` > '2022-10-20'", + None, # time_travel_timestamp + ["_TABLE_SUFFIX"], # pseudocolumns + "SELECT *, _TABLE_SUFFIX AS _BF__TABLE_SUFFIX FROM `test_table*` WHERE `_TABLE_SUFFIX` > '2022-10-20'", id="wildcard-filter", ), ], @@ -426,6 +433,7 @@ def test_to_query( filters, max_results, time_travel_timestamp, + pseudocolumns, expected_output, ): query = io_bq.to_query( @@ -434,6 +442,7 @@ def test_to_query( sql_predicate=io_bq.compile_filters(filters), max_results=max_results, time_travel_timestamp=time_travel_timestamp, + pseudocolumns=pseudocolumns, ) assert query == expected_output