Skip to content

feat: support INFORMATION_SCHEMA views in read_gbq #1895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
89 changes: 83 additions & 6 deletions bigframes/session/_io/bigquery/read_gbq_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,79 @@
import bigframes.session


def _convert_information_schema_table_id_to_table_reference(
table_id: str,
default_project: Optional[str],
) -> bigquery.TableReference:
"""Squeeze an INFORMATION_SCHEMA reference into a TableReference.

This is kind-of a hack. INFORMATION_SCHEMA is a view that isn't available
via the tables.get REST API.
"""
parts = table_id.split(".")
parts_casefold = [part.casefold() for part in parts]
dataset_index = parts_casefold.index("INFORMATION_SCHEMA".casefold())

if dataset_index == 0:
project = default_project
else:
project = ".".join(parts[:dataset_index])

if project is None:
message = (
"Could not determine project ID. "
"Please provide a project or region in your INFORMATION_SCHEMA table ID, "
"For example, 'region-REGION_NAME.INFORMATION_SCHEMA.JOBS'."
)
raise ValueError(message)

dataset = "INFORMATION_SCHEMA"
table_id_short = ".".join(parts[dataset_index + 1 :])
return bigquery.TableReference(
bigquery.DatasetReference(project, dataset),
table_id_short,
)


def get_information_schema_metadata(
bqclient: bigquery.Client,
table_id: str,
default_project: Optional[str],
) -> bigquery.Table:
job_config = bigquery.QueryJobConfig(dry_run=True)
job = bqclient.query(
f"SELECT * FROM `{table_id}`",
job_config=job_config,
)
table_ref = _convert_information_schema_table_id_to_table_reference(
table_id=table_id,
default_project=default_project,
)
table = bigquery.Table.from_api_repr(
{
"tableReference": table_ref.to_api_repr(),
"location": job.location,
# Prevent ourselves from trying to read the table with the BQ
# Storage API.
"type": "VIEW",
}
)
table.schema = job.schema
return table


def get_table_metadata(
bqclient: bigquery.Client,
table_ref: google.cloud.bigquery.table.TableReference,
bq_time: datetime.datetime,
*,
cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]],
table_id: str,
default_project: Optional[str],
bq_time: datetime.datetime,
cache: Dict[str, Tuple[datetime.datetime, bigquery.Table]],
use_cache: bool = True,
) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]:
"""Get the table metadata, either from cache or via REST API."""

cached_table = cache.get(table_ref)
cached_table = cache.get(table_id)
if use_cache and cached_table is not None:
snapshot_timestamp, _ = cached_table

Expand All @@ -76,15 +138,30 @@ def get_table_metadata(
warnings.warn(msg, stacklevel=7)
return cached_table

table = bqclient.get_table(table_ref)
table_id_casefold = table_id.casefold()
if (
# Ensure we don't have false positives for some user defined dataset
# like MY_INFORMATION_SCHEMA or tables called INFORMATION_SCHEMA.
".INFORMATION_SCHEMA.".casefold() in table_id_casefold
or table_id_casefold.startswith("INFORMATION_SCHEMA.".casefold())
):
table = get_information_schema_metadata(
bqclient=bqclient, table_id=table_id, default_project=default_project
)
else:
table_ref = google.cloud.bigquery.table.TableReference.from_string(
table_id, default_project=default_project
)
table = bqclient.get_table(table_ref)

# local time will lag a little bit do to network latency
# make sure it is at least table creation time.
# This is relevant if the table was created immediately before loading it here.
if (table.created is not None) and (table.created > bq_time):
bq_time = table.created

cached_table = (bq_time, table)
cache[table_ref] = cached_table
cache[table_id] = cached_table
return cached_table


Expand Down
11 changes: 3 additions & 8 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,7 @@ def __init__(
self._default_index_type = default_index_type
self._scan_index_uniqueness = scan_index_uniqueness
self._force_total_order = force_total_order
self._df_snapshot: Dict[
bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]
] = {}
self._df_snapshot: Dict[str, Tuple[datetime.datetime, bigquery.Table]] = {}
self._metrics = metrics
# Unfortunate circular reference, but need to pass reference when constructing objects
self._session = session
Expand Down Expand Up @@ -617,10 +615,6 @@ def read_gbq_table(

_check_duplicates("columns", columns)

table_ref = google.cloud.bigquery.table.TableReference.from_string(
table_id, default_project=self._bqclient.project
)

columns = list(columns)
include_all_columns = columns is None or len(columns) == 0
filters = typing.cast(list, list(filters))
Expand All @@ -631,7 +625,8 @@ def read_gbq_table(

time_travel_timestamp, table = bf_read_gbq_table.get_table_metadata(
self._bqclient,
table_ref=table_ref,
table_id=table_id,
default_project=self._bqclient.project,
bq_time=self._clock.get_time(),
cache=self._df_snapshot,
use_cache=use_cache,
Expand Down
3 changes: 3 additions & 0 deletions bigframes/session/read_api_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def execute(
if node.explicitly_ordered and ordered:
return None

if not node.source.table.is_physically_stored:
return None

if limit is not None:
if peek is None or limit < peek:
peek = limit
Expand Down
50 changes: 50 additions & 0 deletions tests/system/small/pandas/test_read_gbq_information_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest


@pytest.mark.parametrize("include_project", [True, False])
@pytest.mark.parametrize(
"view_id",
[
# https://cloud.google.com/bigquery/docs/information-schema-intro
"region-US.INFORMATION_SCHEMA.JOBS_BY_USER",
"region-US.INFORMATION_SCHEMA.SCHEMATA",
],
)
def test_read_gbq_jobs_by_user_returns_schema(
unordered_session, view_id: str, include_project: bool
):
if include_project:
table_id = unordered_session.bqclient.project + "." + view_id
else:
table_id = view_id

df = unordered_session.read_gbq(table_id)
assert df.dtypes is not None


def test_read_gbq_schemata_can_be_peeked(unordered_session):
df = unordered_session.read_gbq("region-US.INFORMATION_SCHEMA.SCHEMATA")
result = df.peek()
assert result is not None


def test_read_gbq_schemata_four_parts_can_be_peeked(unordered_session):
df = unordered_session.read_gbq(
f"{unordered_session.bqclient.project}.region-US.INFORMATION_SCHEMA.SCHEMATA"
)
result = df.peek()
assert result is not None
2 changes: 1 addition & 1 deletion tests/unit/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def test_read_gbq_cached_table():
table._properties["numRows"] = "1000000000"
table._properties["location"] = session._location
table._properties["type"] = "TABLE"
session._loader._df_snapshot[table_ref] = (
session._loader._df_snapshot[str(table_ref)] = (
datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc),
table,
)
Expand Down