Skip to content

feat: expose _TABLE_SUFFIX pseudocolumn for wildcard tables #1689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a9edb2a
perf: defer query in `read_gbq` with wildcard tables
tswast Apr 27, 2025
df795b1
remove obsolete comments
tswast Apr 27, 2025
f81fe4e
Merge remote-tracking branch 'origin/main' into b405773140-wildcard
tswast Apr 28, 2025
79f4c58
use sql node instead of ibis table node to keep select * from omittin…
tswast Apr 28, 2025
5b0d0a0
test with cache and to_gbq
tswast Apr 29, 2025
118964b
rename columns before caching
tswast Apr 29, 2025
ca33463
remove unnecessary comment
tswast Apr 29, 2025
e546745
Merge remote-tracking branch 'origin/main' into b405773140-wildcard
tswast Apr 29, 2025
4897ca4
add missing import
tswast Apr 29, 2025
e1a7341
do not materialize _TABLE_SUFFIX
tswast Apr 29, 2025
af06200
fix unit tests
tswast Apr 29, 2025
af5c036
Merge branch 'main' into b405773140-wildcard
tswast Apr 29, 2025
f26574b
correct number of columns in cache with offsets
tswast Apr 29, 2025
dd05c2d
Merge branch 'main' into b405773140-wildcard
tswast Apr 29, 2025
ab0e50a
fix formatting
tswast Apr 29, 2025
89535e2
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 29, 2025
8bb09d5
Merge branch 'b405773140-wildcard' of https://github.com/googleapis/p…
gcf-owl-bot[bot] Apr 29, 2025
40e2e77
Merge branch 'main' into b405773140-wildcard
tswast Apr 29, 2025
d37bf5e
revert datetime change, max_results change
tswast Apr 29, 2025
2f25f8d
Merge remote-tracking branch 'origin/b405773140-wildcard' into b40577…
tswast Apr 29, 2025
4bf66b6
add pseudocolumns to node
tswast Apr 29, 2025
8c96498
fix unit tests
tswast Apr 29, 2025
e1780a6
actually fix unit tests
tswast Apr 29, 2025
b027b51
try to rename as part of compile
tswast Apr 29, 2025
00fbd91
add renames to as cached table
tswast Apr 30, 2025
9a778db
use correct node for table schema
tswast Apr 30, 2025
d076cd3
Merge branch 'main' into b405773140-wildcard
tswast Apr 30, 2025
7d8ddcc
Merge remote-tracking branch 'origin/main' into b405773140-pseudocolumns
tswast May 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions bigframes/core/array_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import datetime
import functools
import typing
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple
from typing import Dict, Iterable, List, Mapping, Optional, Sequence, Tuple
import warnings

import google.cloud.bigquery
Expand Down Expand Up @@ -176,18 +176,27 @@ def as_cached(
self: ArrayValue,
cache_table: google.cloud.bigquery.Table,
ordering: Optional[orderings.RowOrdering],
*,
renames: Optional[Dict[str, str]] = None,
) -> ArrayValue:
"""
Replace the node with an equivalent one that references a table where the value has been materialized to.
"""
if renames is None:
renames = {}

table = nodes.GbqTable.from_table(cache_table)
source = nodes.BigqueryDataSource(
table, ordering=ordering, n_rows=cache_table.num_rows
)
# Assumption: GBQ cached table uses field name as bq column name
scan_list = nodes.ScanList(
tuple(
nodes.ScanItem(field.id, field.dtype, field.id.name)
nodes.ScanItem(
field.id,
field.dtype,
renames.get(field.id.name, field.id.name),
)
for field in self.node.fields
)
)
Expand Down
13 changes: 11 additions & 2 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,25 @@ def _table_to_ibis(
)
# Physical schema might include unused columns, unsupported datatypes like JSON
physical_schema = ibis_bigquery.BigQuerySchema.to_ibis(
list(source.table.physical_schema)
list(source.table.physical_schema) + list(source.table.pseudocolumns)
)
if source.at_time is not None or source.sql_predicate is not None:
if (
source.at_time is not None
or source.sql_predicate is not None
# ibis.table is not aware of pseudocolumns, so we compile to SQL ourselves.
or source.table.pseudocolumns
):
import bigframes.session._io.bigquery

sql = bigframes.session._io.bigquery.to_query(
full_table_name,
columns=scan_cols,
sql_predicate=source.sql_predicate,
time_travel_timestamp=source.at_time,
# Need to include pseudocolumns in case we're doing a SELECT *,
# as those wouldn't normally be included.
# TODO(tswast): Is scan_cols ever empty? If not, this might not be necessary.
pseudocolumns=[field.name for field in source.table.pseudocolumns],
)
return ibis_bigquery.Backend().sql(schema=physical_schema, query=sql)
else:
Expand Down
24 changes: 20 additions & 4 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,14 +704,19 @@ class GbqTable:
physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field()
is_physically_stored: bool = dataclasses.field()
cluster_cols: typing.Optional[Tuple[str, ...]]
pseudocolumns: Tuple[bq.SchemaField, ...] = dataclasses.field()

@staticmethod
def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
import bigframes.core.tools.bigquery # Avoid circular imports.

# Subsetting fields with columns can reduce cost of row-hash default ordering
table_schema = table.schema

if columns:
schema = tuple(item for item in table.schema if item.name in columns)
schema = tuple(item for item in table_schema if item.name in columns)
else:
schema = tuple(table.schema)
schema = tuple(table_schema)
return GbqTable(
project_id=table.project,
dataset_id=table.dataset_id,
Expand All @@ -721,6 +726,7 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
cluster_cols=None
if table.clustering_fields is None
else tuple(table.clustering_fields),
pseudocolumns=tuple(bigframes.core.tools.bigquery.get_pseudocolumns(table)),
)

def get_table_ref(self) -> bq.TableReference:
Expand All @@ -731,7 +737,10 @@ def get_table_ref(self) -> bq.TableReference:
@property
@functools.cache
def schema_by_id(self):
return {col.name: col for col in self.physical_schema}
return {
col.name: col
for col in itertools.chain(self.physical_schema, self.pseudocolumns)
}


@dataclasses.dataclass(frozen=True)
Expand Down Expand Up @@ -762,7 +771,14 @@ class ReadTableNode(LeafNode):

def _validate(self):
# enforce invariants
physical_names = set(map(lambda i: i.name, self.source.table.physical_schema))
physical_names = set(
map(
lambda i: i.name,
itertools.chain(
self.source.table.physical_schema, self.source.table.pseudocolumns
),
)
)
if not set(scan.source_id for scan in self.scan_list.items).issubset(
physical_names
):
Expand Down
18 changes: 14 additions & 4 deletions bigframes/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from dataclasses import dataclass
import functools
import itertools
import typing
from typing import Sequence

Expand Down Expand Up @@ -48,15 +49,24 @@ def from_bq_table(
typing.Dict[str, bigframes.dtypes.Dtype]
] = None,
):
# Avoid circular imports.
import bigframes.core.tools.bigquery

if column_type_overrides is None:
column_type_overrides = {}
items = tuple(
items = [
SchemaItem(name, column_type_overrides.get(name, dtype))
for name, dtype in bigframes.dtypes.bf_type_from_type_kind(
table.schema
list(
itertools.chain(
table.schema,
bigframes.core.tools.bigquery.get_pseudocolumns(table),
)
)
).items()
)
return ArraySchema(items)
]

return ArraySchema(tuple(items))

@property
def names(self) -> typing.Tuple[str, ...]:
Expand Down
40 changes: 40 additions & 0 deletions bigframes/core/tools/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Private helpers for loading a BigQuery table as a BigQuery DataFrames DataFrame.
"""

from __future__ import annotations

import google.cloud.bigquery as bigquery


def get_pseudocolumns(
table: bigquery.table.Table,
) -> list[bigquery.SchemaField]:
"""Which pseudocolumns are available for this table?"""
fields = []

# TODO(tswast): Add _PARTITIONTIME and/or _PARTIONDATE for injestion
# time partitioned tables.
if table.table_id.endswith("*"):
fields.append(
bigquery.SchemaField(
"_TABLE_SUFFIX",
"STRING",
)
)

return fields
20 changes: 20 additions & 0 deletions bigframes/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,26 @@ def label_to_identifier(label: typing.Hashable, strict: bool = False) -> str:
elif identifier[0].isdigit():
# first character must be letter or underscore
identifier = "_" + identifier

# Except in special circumstances (true anonymous query results tables),
# field names are not allowed to start with these (case-insensitive)
# prefixes.
# _PARTITION, _TABLE_, _FILE_, _ROW_TIMESTAMP, __ROOT__ and _COLIDENTIFIER
if any(
identifier.casefold().startswith(invalid_prefix.casefold())
for invalid_prefix in (
"_PARTITION",
"_TABLE_",
"_FILE_",
"_ROW_TIMESTAMP",
"__ROOT__",
"_COLIDENTIFIER",
)
):
# Remove leading _ character(s) to avoid collisions with preserved
# prefixes.
identifier = re.sub("^_+", "", identifier)

return identifier


Expand Down
7 changes: 7 additions & 0 deletions bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ def to_query(
sql_predicate: Optional[str],
max_results: Optional[int] = None,
time_travel_timestamp: Optional[datetime.datetime] = None,
pseudocolumns: Iterable[str] = (),
) -> str:
"""Compile query_or_table with conditions(filters, wildcards) to query."""
sub_query = (
Expand All @@ -392,6 +393,12 @@ def to_query(
else:
select_clause = "SELECT *"

if pseudocolumns:
pseudo_sql = ", ".join(
f"{column} AS _BF_{column}" for column in pseudocolumns
)
select_clause += f", {pseudo_sql}"

time_travel_clause = ""
if time_travel_timestamp is not None:
time_travel_literal = bigframes.core.sql.simple_literal(time_travel_timestamp)
Expand Down
11 changes: 10 additions & 1 deletion bigframes/session/_io/bigquery/read_gbq_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,16 @@ def validate_table(
# Anonymous dataset, does not support snapshot ever
if table.dataset_id.startswith("_"):
pass

# Only true tables support time travel
elif table.table_id.endswith("*"):
msg = bfe.format_message(
"Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. "
"Attempting query without time travel. Be aware that "
"modifications to the underlying data may result in errors or "
"unexpected behavior."
)
warnings.warn(msg, category=bfe.TimeTravelDisabledWarning)
elif table.table_type != "TABLE":
if table.table_type == "MATERIALIZED_VIEW":
msg = bfe.format_message(
Expand Down Expand Up @@ -137,7 +146,7 @@ def validate_table(
sql_predicate=filter_str,
time_travel_timestamp=None,
)
# Any erorrs here should just be raised to user
# Any errors here should just be raised to user
bqclient.query_and_wait(
snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True)
)
Expand Down
20 changes: 17 additions & 3 deletions bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import bigframes.core.nodes as nodes
import bigframes.core.ordering as order
import bigframes.core.tree_properties as tree_properties
import bigframes.core.utils as utils
import bigframes.dtypes
import bigframes.exceptions as bfe
import bigframes.features
Expand Down Expand Up @@ -346,8 +347,13 @@ def _cache_with_cluster_cols(
):
"""Executes the query and uses the resulting table to rewrite future executions."""

prev_col_ids = array_value.column_ids
new_col_ids, _ = utils.get_standardized_ids(prev_col_ids)
renames = dict(zip(prev_col_ids, new_col_ids))
renamed = array_value.rename_columns(renames)

sql, schema, ordering_info = self.compiler.compile_raw(
self.logical_plan(array_value.node)
self.logical_plan(renamed.node)
)
tmp_table = self._sql_as_cached_temp_table(
sql,
Expand All @@ -357,23 +363,31 @@ def _cache_with_cluster_cols(
cached_replacement = array_value.as_cached(
cache_table=self.bqclient.get_table(tmp_table),
ordering=ordering_info,
renames=renames,
).node
self._cached_executions[array_value.node] = cached_replacement

def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue):
"""Executes the query and uses the resulting table to rewrite future executions."""
offset_column = bigframes.core.guid.generate_guid("bigframes_offsets")
w_offsets, offset_column = array_value.promote_offsets()
sql = self.compiler.compile(self.logical_plan(w_offsets.node), ordered=False)

prev_col_ids = w_offsets.column_ids
new_col_ids, _ = utils.get_standardized_ids(prev_col_ids)
renames = dict(zip(prev_col_ids, new_col_ids))
renamed = w_offsets.rename_columns(renames)

sql = self.compiler.compile(self.logical_plan(renamed.node), ordered=False)

tmp_table = self._sql_as_cached_temp_table(
sql,
w_offsets.schema.to_bigquery(),
renamed.schema.to_bigquery(),
cluster_cols=[offset_column],
)
cached_replacement = array_value.as_cached(
cache_table=self.bqclient.get_table(tmp_table),
ordering=order.TotalOrdering.from_offset_col(offset_column),
renames=renames,
).node
self._cached_executions[array_value.node] = cached_replacement

Expand Down
23 changes: 17 additions & 6 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import os
import typing
from typing import (
cast,
Dict,
Generator,
Hashable,
Expand All @@ -48,6 +49,7 @@
import bigframes.core as core
import bigframes.core.blocks as blocks
import bigframes.core.schema as schemata
import bigframes.core.tools.bigquery
import bigframes.dtypes
import bigframes.formatting_helpers as formatting_helpers
from bigframes.session import dry_runs
Expand Down Expand Up @@ -518,16 +520,13 @@ def read_gbq_table(
# clustered tables, so fallback to a query. We do this here so that
# the index is consistent with tables that have primary keys, even
# when max_results is set.
# TODO(b/338419730): We don't need to fallback to a query for wildcard
# tables if we allow some non-determinism when time travel isn't supported.
if max_results is not None or bf_io_bigquery.is_table_with_wildcard_suffix(
table_id
):
if max_results is not None:
# TODO(b/338111344): If we are running a query anyway, we might as
# well generate ROW_NUMBER() at the same time.
all_columns: Iterable[str] = (
itertools.chain(index_cols, columns) if columns else ()
)
pseudocolumns = bigframes.core.tools.bigquery.get_pseudocolumns(table)
query = bf_io_bigquery.to_query(
table_id,
columns=all_columns,
Expand All @@ -538,9 +537,10 @@ def read_gbq_table(
# We're executing the query, so we don't need time travel for
# determinism.
time_travel_timestamp=None,
pseudocolumns=[field.name for field in pseudocolumns],
)

return self.read_gbq_query( # type: ignore # for dry_run overload
df = self.read_gbq_query( # type: ignore # for dry_run overload
query,
index_col=index_cols,
columns=columns,
Expand All @@ -549,6 +549,17 @@ def read_gbq_table(
dry_run=dry_run,
)

if pseudocolumns and not dry_run:
df = cast(dataframe.DataFrame, df,).rename(
columns=dict(
zip(
[f"_BF_{field.name}" for field in pseudocolumns],
[field.name for field in pseudocolumns],
)
)
)
return df

if dry_run:
return dry_runs.get_table_stats(table)

Expand Down
Loading