diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 5024b01323..133d271fed 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -14,12 +14,15 @@ from __future__ import annotations from dataclasses import dataclass +import datetime import functools import io import itertools import typing -from typing import Iterable, Sequence +from typing import Iterable, Optional, Sequence +import warnings +import google.cloud.bigquery import ibis.expr.types as ibis_types import pandas import pyarrow as pa @@ -92,6 +95,35 @@ def from_pyarrow(cls, arrow_table: pa.Table, session: Session): ) return cls(node) + @classmethod + def from_table( + cls, + table: google.cloud.bigquery.Table, + schema: schemata.ArraySchema, + session: Session, + *, + predicate: Optional[str] = None, + at_time: Optional[datetime.datetime] = None, + primary_key: Sequence[str] = (), + ): + if any(i.field_type == "JSON" for i in table.schema if i.name in schema.names): + warnings.warn( + "Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.", + bigframes.exceptions.PreviewWarning, + ) + node = nodes.ReadTableNode( + project_id=table.reference.project, + dataset_id=table.reference.dataset_id, + table_id=table.reference.table_id, + physical_schema=tuple(table.schema), + total_order_cols=tuple(primary_key), + columns=schema, + at_time=at_time, + table_session=session, + sql_predicate=predicate, + ) + return cls(node) + @property def column_ids(self) -> typing.Sequence[str]: return self.schema.names diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index a9908192f3..f948d10a5b 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -17,12 +17,20 @@ import io import typing +import ibis +import ibis.backends +import ibis.backends.bigquery +import ibis.expr.types import pandas as pd import bigframes.core.compile.compiled as compiled import bigframes.core.compile.concat as concat_impl +import bigframes.core.compile.default_ordering as default_ordering +import bigframes.core.compile.schema_translator import bigframes.core.compile.single_column import bigframes.core.nodes as nodes +import bigframes.core.ordering as bf_ordering +import bigframes.dtypes as bigframes_dtypes if typing.TYPE_CHECKING: import bigframes.core @@ -88,6 +96,87 @@ def compile_readlocal(node: nodes.ReadLocalNode, ordered: bool = True): return ordered_ir.to_unordered() +@_compile_node.register +def compile_readtable(node: nodes.ReadTableNode, ordered: bool = True): + if ordered: + return compile_read_table_ordered(node) + else: + return compile_read_table_unordered(node) + + +def read_table_as_unordered_ibis(node: nodes.ReadTableNode) -> ibis.expr.types.Table: + full_table_name = f"{node.project_id}.{node.dataset_id}.{node.table_id}" + used_columns = ( + *node.schema.names, + *[i for i in node.total_order_cols if i not in node.schema.names], + ) + # Physical schema might include unused columns, unsupported datatypes like JSON + physical_schema = ibis.backends.bigquery.BigQuerySchema.to_ibis( + list(i for i in node.physical_schema if i.name in used_columns) + ) + if node.at_time is not None or node.sql_predicate is not None: + import bigframes.session._io.bigquery + + sql = bigframes.session._io.bigquery.to_query( + full_table_name, + columns=used_columns, + sql_predicate=node.sql_predicate, + time_travel_timestamp=node.at_time, + ) + return ibis.backends.bigquery.Backend().sql(schema=physical_schema, query=sql) + else: + return ibis.table(physical_schema, full_table_name) + + +def compile_read_table_unordered(node: nodes.ReadTableNode): + ibis_table = read_table_as_unordered_ibis(node) + return compiled.UnorderedIR( + ibis_table, + tuple( + bigframes_dtypes.ibis_value_to_canonical_type(ibis_table[col]) + for col in node.schema.names + ), + ) + + +def compile_read_table_ordered(node: nodes.ReadTableNode): + ibis_table = read_table_as_unordered_ibis(node) + if node.total_order_cols: + ordering_value_columns = tuple( + bf_ordering.ascending_over(col) for col in node.total_order_cols + ) + if node.order_col_is_sequential: + integer_encoding = bf_ordering.IntegerEncoding( + is_encoded=True, is_sequential=True + ) + else: + integer_encoding = bf_ordering.IntegerEncoding() + ordering = bf_ordering.ExpressionOrdering( + ordering_value_columns, + integer_encoding=integer_encoding, + total_ordering_columns=frozenset(node.total_order_cols), + ) + hidden_columns = () + else: + ibis_table, ordering = default_ordering.gen_default_ordering( + ibis_table, use_double_hash=True + ) + hidden_columns = tuple( + ibis_table[col] + for col in ibis_table.columns + if col not in node.schema.names + ) + return compiled.OrderedIR( + ibis_table, + columns=tuple( + bigframes_dtypes.ibis_value_to_canonical_type(ibis_table[col]) + for col in node.schema.names + ), + ordering=ordering, + hidden_ordering_columns=hidden_columns, + ) + + @_compile_node.register def compile_readgbq(node: nodes.ReadGbqNode, ordered: bool = True): if ordered: diff --git a/bigframes/core/compile/default_ordering.py b/bigframes/core/compile/default_ordering.py new file mode 100644 index 0000000000..d8bdc0546b --- /dev/null +++ b/bigframes/core/compile/default_ordering.py @@ -0,0 +1,91 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Private helpers for loading a BigQuery table as a BigQuery DataFrames DataFrame. +""" + +from __future__ import annotations + +import itertools +from typing import cast + +import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops +import ibis +import ibis.backends +import ibis.expr.datatypes as ibis_dtypes +import ibis.expr.types as ibis_types + +import bigframes.core.guid as guid +import bigframes.core.ordering as order + + +def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringValue: + col_type = column.type() + if ( + col_type.is_numeric() + or col_type.is_boolean() + or col_type.is_binary() + or col_type.is_temporal() + ): + result = column.cast(ibis_dtypes.String(nullable=True)) + elif col_type.is_geospatial(): + result = cast(ibis_types.GeoSpatialColumn, column).as_text() + elif col_type.is_string(): + result = column + else: + # TO_JSON_STRING works with all data types, but isn't the most efficient + # Needed for JSON, STRUCT and ARRAY datatypes + result = vendored_ibis_ops.ToJsonString(column).to_expr() # type: ignore + # Escape backslashes and use backslash as delineator + escaped = cast(ibis_types.StringColumn, result.fillna("")).replace("\\", "\\\\") # type: ignore + return cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped) + + +def gen_default_ordering(table: ibis.table, use_double_hash: bool = True): + ordering_hash_part = guid.generate_guid("bigframes_ordering_") + ordering_hash_part2 = guid.generate_guid("bigframes_ordering_") + ordering_rand_part = guid.generate_guid("bigframes_ordering_") + + # All inputs into hash must be non-null or resulting hash will be null + str_values = list( + map(lambda col: _convert_to_nonnull_string(table[col]), table.columns) + ) + full_row_str = ( + str_values[0].concat(*str_values[1:]) if len(str_values) > 1 else str_values[0] + ) + full_row_hash = full_row_str.hash().name(ordering_hash_part) + # By modifying value slightly, we get another hash uncorrelated with the first + full_row_hash_p2 = (full_row_str + "_").hash().name(ordering_hash_part2) + # Used to disambiguate between identical rows (which will have identical hash) + random_value = ibis.random().name(ordering_rand_part) + + order_values = ( + [full_row_hash, full_row_hash_p2, random_value] + if use_double_hash + else [full_row_hash, random_value] + ) + + original_column_ids = table.columns + table_with_ordering = table.select( + itertools.chain(original_column_ids, order_values) + ) + + ordering = order.ExpressionOrdering( + ordering_value_columns=tuple( + order.ascending_over(col.get_name()) for col in order_values + ), + total_ordering_columns=frozenset(col.get_name() for col in order_values), + ) + return table_with_ordering, ordering diff --git a/bigframes/core/compile/schema_translator.py b/bigframes/core/compile/schema_translator.py new file mode 100644 index 0000000000..03e9691af6 --- /dev/null +++ b/bigframes/core/compile/schema_translator.py @@ -0,0 +1,34 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import ibis +import ibis.expr.schema + +import bigframes.core.schema as bf_schema +import bigframes.dtypes + + +def convert_bf_schema(schema: bf_schema.ArraySchema) -> ibis.expr.schema.Schema: + """ + Convert bigframes schema to ibis schema. This is unambigous as every bigframes type is backed by a specific SQL/ibis dtype. + """ + names = schema.names + types = [ + bigframes.dtypes.bigframes_dtype_to_ibis_dtype(bf_type) + for bf_type in schema.dtypes + ] + return ibis.schema(names=names, types=types) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 688e165732..1af7c5bd17 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -16,11 +16,14 @@ import abc from dataclasses import dataclass, field, fields, replace +import datetime import functools import itertools import typing from typing import Callable, Tuple +import google.cloud.bigquery as bq + import bigframes.core.expression as ex import bigframes.core.guid from bigframes.core.join_def import JoinColumnMapping, JoinDefinition, JoinSide @@ -345,6 +348,67 @@ def transform_children( return self +## Put ordering in here or just add order_by node above? +@dataclass(frozen=True) +class ReadTableNode(BigFrameNode): + project_id: str = field() + dataset_id: str = field() + table_id: str = field() + + physical_schema: Tuple[bq.SchemaField, ...] = field() + # Subset of physical schema columns, with chosen BQ types + columns: schemata.ArraySchema = field() + + table_session: bigframes.session.Session = field() + # Empty tuple if no primary key (primary key can be any set of columns that together form a unique key) + # Empty if no known unique key + total_order_cols: Tuple[str, ...] = field() + # indicates a primary key that is exactly offsets 0, 1, 2, ..., N-2, N-1 + order_col_is_sequential: bool = False + at_time: typing.Optional[datetime.datetime] = None + # Added for backwards compatibility, not validated + sql_predicate: typing.Optional[str] = None + + def __post_init__(self): + # enforce invariants + physical_names = set(map(lambda i: i.name, self.physical_schema)) + if not set(self.columns.names).issubset(physical_names): + raise ValueError( + f"Requested schema {self.columns} cannot be derived from table schemal {self.physical_schema}" + ) + if self.order_col_is_sequential and len(self.total_order_cols) == 1: + raise ValueError("Sequential primary key must have only one component") + + @property + def session(self): + return self.table_session + + def __hash__(self): + return self._node_hash + + @property + def roots(self) -> typing.Set[BigFrameNode]: + return {self} + + @property + def schema(self) -> schemata.ArraySchema: + return self.columns + + @property + def relation_ops_created(self) -> int: + # Assume worst case, where readgbq actually has baked in analytic operation to generate index + return 3 + + @functools.cached_property + def variables_introduced(self) -> int: + return len(self.schema.items) + 1 + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return self + + # Unary nodes @dataclass(frozen=True) class PromoteOffsetsNode(UnaryNode): diff --git a/bigframes/core/schema.py b/bigframes/core/schema.py index 3629778aaf..30a2a6593a 100644 --- a/bigframes/core/schema.py +++ b/bigframes/core/schema.py @@ -18,6 +18,8 @@ import functools import typing +import google.cloud.bigquery + import bigframes.core.guid import bigframes.dtypes @@ -34,6 +36,16 @@ class SchemaItem: class ArraySchema: items: typing.Tuple[SchemaItem, ...] + @classmethod + def from_bq_table(cls, table: google.cloud.bigquery.Table): + items = tuple( + SchemaItem(name, dtype) + for name, dtype in bigframes.dtypes.bf_type_from_type_kind( + table.schema + ).items() + ) + return ArraySchema(items) + @property def names(self) -> typing.Tuple[str, ...]: return tuple(item.column for item in self.items) @@ -51,6 +63,11 @@ def drop(self, columns: typing.Iterable[str]) -> ArraySchema: tuple(item for item in self.items if item.column not in columns) ) + def select(self, columns: typing.Iterable[str]) -> ArraySchema: + return ArraySchema( + tuple(SchemaItem(name, self.get_type(name)) for name in columns) + ) + def append(self, item: SchemaItem): return ArraySchema(tuple([*self.items, item])) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 2a344aff2d..0c32a81404 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -19,6 +19,7 @@ import textwrap import typing from typing import Any, Dict, Iterable, Literal, Tuple, Union +import warnings import bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops @@ -33,6 +34,7 @@ import pyarrow as pa import bigframes.constants as constants +import bigframes.exceptions # Type hints for Pandas dtypes supported by BigQuery DataFrame Dtype = Union[ @@ -181,6 +183,10 @@ def is_bool_coercable(type: ExpressionType) -> bool: ibis_dtypes.Decimal(precision=76, scale=38, nullable=True), pd.ArrowDtype(pa.decimal256(76, 38)), ), + ( + ibis_dtypes.GeoSpatial(geotype="geography", srid=4326, nullable=True), + gpd.array.GeometryDtype(), + ), ) BIGFRAMES_TO_IBIS: Dict[Dtype, ibis_dtypes.DataType] = { @@ -212,9 +218,6 @@ def is_bool_coercable(type: ExpressionType) -> bool: ) IBIS_TO_BIGFRAMES.update( { - ibis_dtypes.GeoSpatial( - geotype="geography", srid=4326, nullable=True - ): gpd.array.GeometryDtype(), # TODO: Interval } ) @@ -280,6 +283,14 @@ def ibis_dtype_to_bigframes_dtype( if isinstance(ibis_dtype, ibis_dtypes.Integer): return pd.Int64Dtype() + # Temporary: Will eventually support an explicit json type instead of casting to string. + if isinstance(ibis_dtype, ibis_dtypes.JSON): + warnings.warn( + "Interpreting JSON as string. This behavior may change in future versions.", + bigframes.exceptions.PreviewWarning, + ) + return STRING_DTYPE + if ibis_dtype in IBIS_TO_BIGFRAMES: return IBIS_TO_BIGFRAMES[ibis_dtype] elif isinstance(ibis_dtype, ibis_dtypes.Decimal): diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index c953087b3e..c8601c101e 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -19,6 +19,7 @@ import collections.abc import copy import datetime +import itertools import logging import math import os @@ -85,6 +86,7 @@ import bigframes.core.nodes as nodes from bigframes.core.ordering import IntegerEncoding import bigframes.core.ordering as order +import bigframes.core.schema as schemata import bigframes.core.tree_properties as traversals import bigframes.core.tree_properties as tree_properties import bigframes.core.utils as utils @@ -621,11 +623,11 @@ def _read_gbq_query( if len(filters) != 0 or max_results is not None: # TODO(b/338111344): If we are running a query anyway, we might as # well generate ROW_NUMBER() at the same time. + all_columns = itertools.chain(index_cols, columns) if columns else () query = bf_io_bigquery.to_query( query, - index_cols, - columns, - filters, + all_columns, + bf_io_bigquery.compile_filters(filters) if filters else None, max_results=max_results, # We're executing the query, so we don't need time travel for # determinism. @@ -741,7 +743,6 @@ def _read_gbq_table( # Fetch table metadata and validate # --------------------------------- - time_travel_timestamp: Optional[datetime.datetime] = None time_travel_timestamp, table = bf_read_gbq_table.get_table_metadata( self.bqclient, table_ref=table_ref, @@ -799,11 +800,13 @@ def _read_gbq_table( ): # TODO(b/338111344): If we are running a query anyway, we might as # well generate ROW_NUMBER() at the same time. + all_columns = itertools.chain(index_cols, columns) if columns else () query = bf_io_bigquery.to_query( query, - index_cols=index_cols, - columns=columns, - filters=filters, + columns=all_columns, + sql_predicate=bf_io_bigquery.compile_filters(filters) + if filters + else None, max_results=max_results, # We're executing the query, so we don't need time travel for # determinism. @@ -819,7 +822,7 @@ def _read_gbq_table( ) # ----------------------------------------- - # Create Ibis table expression and validate + # Validate table access and features # ----------------------------------------- # Use a time travel to make sure the DataFrame is deterministic, even @@ -828,37 +831,15 @@ def _read_gbq_table( # If a dry run query fails with time travel but # succeeds without it, omit the time travel clause and raise a warning # about potential non-determinism if the underlying tables are modified. - sql = bigframes.session._io.bigquery.to_query( - f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}", - index_cols=index_cols, - columns=columns, - filters=filters, - time_travel_timestamp=time_travel_timestamp, - max_results=None, + filter_str = bf_io_bigquery.compile_filters(filters) if filters else None + all_columns = ( + () + if len(columns) == 0 + else (*columns, *[col for col in index_cols if col not in columns]) ) - dry_run_config = bigquery.QueryJobConfig() - dry_run_config.dry_run = True - try: - self._start_query(sql, job_config=dry_run_config, api_name=api_name) - except google.api_core.exceptions.NotFound: - # note that a notfound caused by a simple typo will be - # caught above when the metadata is fetched, not here - time_travel_timestamp = None - warnings.warn( - "NotFound error when reading table with time travel." - " Attempting query without time travel. Warning: Without" - " time travel, modifications to the underlying table may" - " result in errors or unexpected behavior.", - category=bigframes.exceptions.TimeTravelDisabledWarning, - ) - table_expression = bf_read_gbq_table.get_ibis_time_travel_table( - ibis_client=self.ibis_client, - table_ref=table_ref, - index_cols=index_cols, - columns=columns, - filters=filters, - time_travel_timestamp=time_travel_timestamp, + supports_snapshot = bf_read_gbq_table.validate_table( + self.bqclient, table_ref, all_columns, time_travel_timestamp, filter_str ) # ---------------------------- @@ -876,20 +857,17 @@ def _read_gbq_table( index_cols=index_cols, api_name=api_name, ) - - if is_index_unique: - array_value = bf_read_gbq_table.to_array_value_with_total_ordering( - session=self, - table_expression=table_expression, - total_ordering_cols=index_cols, - ) - else: - # Note: Even though we're adding a default ordering here, that's - # just so we have a deterministic total ordering. If the user - # specified a non-unique index, we still sort by that later. - array_value = bf_read_gbq_table.to_array_value_with_default_ordering( - session=self, table=table_expression, table_rows=table.num_rows - ) + schema = schemata.ArraySchema.from_bq_table(table) + if columns: + schema = schema.select(index_cols + columns) + array_value = core.ArrayValue.from_table( + table, + schema=schema, + predicate=filter_str, + at_time=time_travel_timestamp if supports_snapshot else None, + primary_key=index_cols if is_index_unique else (), + session=self, + ) # ---------------------------------------------------- # Create Default Sequential Index if still have no index diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 6afa86aa2d..f26ca26c2a 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -392,14 +392,12 @@ def is_table_with_wildcard_suffix(query_or_table: str) -> bool: def to_query( query_or_table: str, - index_cols: Iterable[str], columns: Iterable[str], - filters: third_party_pandas_gbq.FiltersType, - max_results: Optional[int], - time_travel_timestamp: Optional[datetime.datetime], + sql_predicate: Optional[str], + max_results: Optional[int] = None, + time_travel_timestamp: Optional[datetime.datetime] = None, ) -> str: """Compile query_or_table with conditions(filters, wildcards) to query.""" - filters = list(filters) sub_query = ( f"({query_or_table})" if is_query(query_or_table) else f"`{query_or_table}`" ) @@ -409,8 +407,7 @@ def to_query( if columns: # We only reduce the selection if columns is set, but we always # want to make sure index_cols is also included. - all_columns = itertools.chain(index_cols, columns) - select_clause = "SELECT " + ", ".join(f"`{column}`" for column in all_columns) + select_clause = "SELECT " + ", ".join(f"`{column}`" for column in columns) else: select_clause = "SELECT *" @@ -423,77 +420,84 @@ def to_query( if max_results is not None: limit_clause = f" LIMIT {bigframes.core.sql.simple_literal(max_results)}" - filter_string = "" - if filters: - valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = { - "in": "IN", - "not in": "NOT IN", - "LIKE": "LIKE", - "==": "=", - ">": ">", - "<": "<", - ">=": ">=", - "<=": "<=", - "!=": "!=", - } - - # If single layer filter, add another pseudo layer. So the single layer represents "and" logic. - if isinstance(filters[0], tuple) and ( - len(filters[0]) == 0 or not isinstance(list(filters[0])[0], tuple) - ): - filters = typing.cast(third_party_pandas_gbq.FiltersType, [filters]) - - for group in filters: - if not isinstance(group, Iterable): - group = [group] - - and_expression = "" - for filter_item in group: - if not isinstance(filter_item, tuple) or (len(filter_item) != 3): - raise ValueError( - f"Elements of filters must be tuples of length 3, but got {repr(filter_item)}.", - ) - - column, operator, value = filter_item - - if not isinstance(column, str): - raise ValueError( - f"Column name should be a string, but received '{column}' of type {type(column).__name__}." - ) - - if operator not in valid_operators: - raise ValueError(f"Operator {operator} is not valid.") - - operator_str = valid_operators[operator] - - column_ref = bigframes.core.sql.identifier(column) - if operator_str in ["IN", "NOT IN"]: - value_literal = bigframes.core.sql.multi_literal(*value) - else: - value_literal = bigframes.core.sql.simple_literal(value) - expression = bigframes.core.sql.infix_op( - operator_str, column_ref, value_literal - ) - if and_expression: - and_expression = bigframes.core.sql.infix_op( - "AND", and_expression, expression - ) - else: - and_expression = expression - - if filter_string: - filter_string = bigframes.core.sql.infix_op( - "OR", filter_string, and_expression - ) - else: - filter_string = and_expression - - where_clause = "" - if filter_string: - where_clause = f" WHERE {filter_string}" + where_clause = f" WHERE {sql_predicate}" if sql_predicate else "" return ( f"{select_clause} " f"FROM {sub_query}" f"{time_travel_clause}{where_clause}{limit_clause}" ) + + +def compile_filters(filters: third_party_pandas_gbq.FiltersType) -> str: + """Compiles a set of filters into a boolean sql expression""" + if not filters: + return "" + filter_string = "" + valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = { + "in": "IN", + "not in": "NOT IN", + "LIKE": "LIKE", + "==": "=", + ">": ">", + "<": "<", + ">=": ">=", + "<=": "<=", + "!=": "!=", + } + + # If single layer filter, add another pseudo layer. So the single layer represents "and" logic. + filters_list: list = list(filters) + if isinstance(filters_list[0], tuple) and ( + len(filters_list[0]) == 0 or not isinstance(list(filters_list[0])[0], tuple) + ): + filter_items = [filters_list] + else: + filter_items = filters_list + + for group in filter_items: + if not isinstance(group, Iterable): + group = [group] + + and_expression = "" + for filter_item in group: + if not isinstance(filter_item, tuple) or (len(filter_item) != 3): + raise ValueError( + f"Elements of filters must be tuples of length 3, but got {repr(filter_item)}.", + ) + + column, operator, value = filter_item + + if not isinstance(column, str): + raise ValueError( + f"Column name should be a string, but received '{column}' of type {type(column).__name__}." + ) + + if operator not in valid_operators: + raise ValueError(f"Operator {operator} is not valid.") + + operator_str = valid_operators[operator] + + column_ref = bigframes.core.sql.identifier(column) + if operator_str in ["IN", "NOT IN"]: + value_literal = bigframes.core.sql.multi_literal(*value) + else: + value_literal = bigframes.core.sql.simple_literal(value) + expression = bigframes.core.sql.infix_op( + operator_str, column_ref, value_literal + ) + if and_expression: + and_expression = bigframes.core.sql.infix_op( + "AND", and_expression, expression + ) + else: + and_expression = expression + + if filter_string: + filter_string = bigframes.core.sql.infix_op( + "OR", filter_string, and_expression + ) + else: + filter_string = and_expression + + return filter_string diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index e00892fce9..879a8ba44c 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -19,27 +19,18 @@ from __future__ import annotations import datetime -import itertools import typing -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Sequence, Tuple import warnings -import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops -import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq import google.api_core.exceptions import google.cloud.bigquery as bigquery -import ibis -import ibis.backends -import ibis.expr.datatypes as ibis_dtypes -import ibis.expr.types as ibis_types import bigframes import bigframes.clients import bigframes.constants -import bigframes.core as core import bigframes.core.compile -import bigframes.core.guid as guid -import bigframes.core.ordering as order +import bigframes.core.compile.default_ordering import bigframes.core.sql import bigframes.dtypes import bigframes.session._io.bigquery @@ -51,28 +42,6 @@ import bigframes.session -def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringValue: - col_type = column.type() - if ( - col_type.is_numeric() - or col_type.is_boolean() - or col_type.is_binary() - or col_type.is_temporal() - ): - result = column.cast(ibis_dtypes.String(nullable=True)) - elif col_type.is_geospatial(): - result = typing.cast(ibis_types.GeoSpatialColumn, column).as_text() - elif col_type.is_string(): - result = column - else: - # TO_JSON_STRING works with all data types, but isn't the most efficient - # Needed for JSON, STRUCT and ARRAY datatypes - result = vendored_ibis_ops.ToJsonString(column).to_expr() # type: ignore - # Escape backslashes and use backslash as delineator - escaped = typing.cast(ibis_types.StringColumn, result.fillna("")).replace("\\", "\\\\") # type: ignore - return typing.cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped) - - def get_table_metadata( bqclient: bigquery.Client, table_ref: google.cloud.bigquery.table.TableReference, @@ -128,39 +97,55 @@ def get_table_metadata( return cached_table -def get_ibis_time_travel_table( - ibis_client: ibis.BaseBackend, - table_ref: bigquery.TableReference, - index_cols: Iterable[str], - columns: Iterable[str], - filters: third_party_pandas_gbq.FiltersType, - time_travel_timestamp: Optional[datetime.datetime], -) -> ibis_types.Table: - # If we have an anonymous query results table, it can't be modified and - # there isn't any BigQuery time travel. - if table_ref.dataset_id.startswith("_"): - time_travel_timestamp = None - +def validate_table( + bqclient: bigquery.Client, + table_ref: bigquery.table.TableReference, + columns: Optional[Sequence[str]], + snapshot_time: datetime.datetime, + filter_str: Optional[str] = None, +) -> bool: + """Validates that the table can be read, returns True iff snapshot is supported.""" + # First run without snapshot to verify table can be read + sql = bigframes.session._io.bigquery.to_query( + query_or_table=f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}", + columns=columns or (), + sql_predicate=filter_str, + ) + dry_run_config = bigquery.QueryJobConfig() + dry_run_config.dry_run = True try: - return ibis_client.sql( - bigframes.session._io.bigquery.to_query( - f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}", - index_cols=index_cols, - columns=columns, - filters=filters, - time_travel_timestamp=time_travel_timestamp, - # If we've made it this far, we know we don't have any - # max_results to worry about, because in that case we will - # have executed a query with a LIMI clause. - max_results=None, - ) - ) + bqclient.query_and_wait(sql, job_config=dry_run_config) except google.api_core.exceptions.Forbidden as ex: - # Ibis does a dry run to get the types of the columns from the SQL. if "Drive credentials" in ex.message: ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." raise + # Anonymous dataset, does not support snapshot ever + if table_ref.dataset_id.startswith("_"): + return False + + # Second, try with snapshot to verify table supports this feature + snapshot_sql = bigframes.session._io.bigquery.to_query( + query_or_table=f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}", + columns=columns or (), + sql_predicate=filter_str, + time_travel_timestamp=snapshot_time, + ) + try: + bqclient.query_and_wait(snapshot_sql, job_config=dry_run_config) + return True + except google.api_core.exceptions.NotFound: + # note that a notfound caused by a simple typo will be + # caught above when the metadata is fetched, not here + warnings.warn( + "NotFound error when reading table with time travel." + " Attempting query without time travel. Warning: Without" + " time travel, modifications to the underlying table may" + " result in errors or unexpected behavior.", + category=bigframes.exceptions.TimeTravelDisabledWarning, + ) + return False + def are_index_cols_unique( bqclient: bigquery.Client, @@ -287,81 +272,52 @@ def get_index_cols( return index_cols -def to_array_value_with_total_ordering( - session: bigframes.session.Session, - table_expression: ibis_types.Table, - total_ordering_cols: List[str], -) -> core.ArrayValue: - """Create an ArrayValue, assuming we already have a total ordering.""" - ordering = order.ExpressionOrdering( - ordering_value_columns=tuple( - order.ascending_over(column_id) for column_id in total_ordering_cols - ), - total_ordering_columns=frozenset(total_ordering_cols), - ) - column_values = [table_expression[col] for col in table_expression.columns] - return core.ArrayValue.from_ibis( - session, - table_expression, - columns=column_values, - hidden_ordering_columns=[], - ordering=ordering, - ) - - -def to_array_value_with_default_ordering( - session: bigframes.session.Session, - table: ibis_types.Table, - table_rows: Optional[int], -) -> core.ArrayValue: - """Create an ArrayValue with a deterministic default ordering.""" - # Since this might also be used as the index, don't use the default - # "ordering ID" name. - - # For small tables, 64 bits is enough to avoid collisions, 128 bits will never ever collide no matter what - # Assume table is large if table row count is unknown - use_double_hash = (table_rows is None) or (table_rows == 0) or (table_rows > 100000) - - ordering_hash_part = guid.generate_guid("bigframes_ordering_") - ordering_hash_part2 = guid.generate_guid("bigframes_ordering_") - ordering_rand_part = guid.generate_guid("bigframes_ordering_") +def get_time_travel_datetime_and_table_metadata( + bqclient: bigquery.Client, + table_ref: bigquery.TableReference, + *, + api_name: str, + cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], + use_cache: bool = True, +) -> Tuple[datetime.datetime, bigquery.Table]: + cached_table = cache.get(table_ref) + if use_cache and cached_table is not None: + snapshot_timestamp, _ = cached_table - # All inputs into hash must be non-null or resulting hash will be null - str_values = list( - map(lambda col: _convert_to_nonnull_string(table[col]), table.columns) - ) - full_row_str = ( - str_values[0].concat(*str_values[1:]) if len(str_values) > 1 else str_values[0] - ) - full_row_hash = full_row_str.hash().name(ordering_hash_part) - # By modifying value slightly, we get another hash uncorrelated with the first - full_row_hash_p2 = (full_row_str + "_").hash().name(ordering_hash_part2) - # Used to disambiguate between identical rows (which will have identical hash) - random_value = ibis.random().name(ordering_rand_part) - - order_values = ( - [full_row_hash, full_row_hash_p2, random_value] - if use_double_hash - else [full_row_hash, random_value] - ) + # Cache hit could be unexpected. See internal issue 329545805. + # Raise a warning with more information about how to avoid the + # problems with the cache. + warnings.warn( + f"Reading cached table from {snapshot_timestamp} to avoid " + "incompatibilies with previous reads of this table. To read " + "the latest version, set `use_cache=False` or close the " + "current session with Session.close() or " + "bigframes.pandas.close_session().", + # There are many layers before we get to (possibly) the user's code: + # pandas.read_gbq_table + # -> with_default_session + # -> Session.read_gbq_table + # -> _read_gbq_table + # -> _get_snapshot_sql_and_primary_key + # -> get_snapshot_datetime_and_table_metadata + stacklevel=7, + ) + return cached_table - original_column_ids = table.columns - table_with_ordering = table.select( - itertools.chain(original_column_ids, order_values) - ) + # TODO(swast): It's possible that the table metadata is changed between now + # and when we run the CURRENT_TIMESTAMP() query to see when we can time + # travel to. Find a way to fetch the table metadata and BQ's current time + # atomically. + table = bqclient.get_table(table_ref) - ordering = order.ExpressionOrdering( - ordering_value_columns=tuple( - order.ascending_over(col.get_name()) for col in order_values - ), - total_ordering_columns=frozenset(col.get_name() for col in order_values), - ) - columns = [table_with_ordering[col] for col in original_column_ids] - hidden_columns = [table_with_ordering[col.get_name()] for col in order_values] - return core.ArrayValue.from_ibis( - session, - table_with_ordering, - columns, - hidden_ordering_columns=hidden_columns, - ordering=ordering, - ) + job_config = bigquery.QueryJobConfig() + job_config.labels["bigframes-api"] = api_name + snapshot_timestamp = list( + bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + cached_table = (snapshot_timestamp, table) + cache[table_ref] = cached_table + return cached_table diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index d687643c8a..1d6be3dff8 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -227,7 +227,6 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) @pytest.mark.parametrize( ( "query_or_table", - "index_cols", "columns", "filters", "max_results", @@ -237,8 +236,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [ pytest.param( "test_table", - ["row_index"], - ["string_col"], + ["row_index", "string_col"], [ (("rowindex", "not in", [0, 6]),), (("string_col", "in", ["Hello, World!", "こんにちは"]),), @@ -261,8 +259,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) FROM `test_table` AS t """ ), - ["rowindex"], - ["string_col"], + ["rowindex", "string_col"], [ ("rowindex", "<", 4), ("string_col", "==", "Hello, World!"), @@ -283,7 +280,6 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) ), pytest.param( "test_table", - [], ["col_a", "col_b"], [], None, # max_results @@ -294,7 +290,6 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) pytest.param( "test_table", [], - [], [("date_col", ">", "2022-10-20")], None, # max_results None, # time_travel_timestampe @@ -305,7 +300,6 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) "test_table*", [], [], - [], None, # max_results None, # time_travel_timestampe "SELECT * FROM `test_table*`", @@ -314,7 +308,6 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) pytest.param( "test_table*", [], - [], [("_TABLE_SUFFIX", ">", "2022-10-20")], None, # max_results None, # time_travel_timestampe @@ -325,7 +318,6 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) ) def test_to_query( query_or_table, - index_cols, columns, filters, max_results, @@ -334,9 +326,8 @@ def test_to_query( ): query = io_bq.to_query( query_or_table, - index_cols=index_cols, columns=columns, - filters=filters, + sql_predicate=io_bq.compile_filters(filters), max_results=max_results, time_travel_timestamp=time_travel_timestamp, ) @@ -356,9 +347,8 @@ def test_to_query_fails_with_bad_filters(filters, expected_message): with pytest.raises(ValueError, match=re.escape(expected_message)): io_bq.to_query( "test_table", - index_cols=(), columns=(), - filters=filters, + sql_predicate=io_bq.compile_filters(filters), max_results=None, time_travel_timestamp=None, ) diff --git a/tests/unit/session/test_read_gbq_table.py b/tests/unit/session/test_read_gbq_table.py index 52c86cd1e4..6933957e53 100644 --- a/tests/unit/session/test_read_gbq_table.py +++ b/tests/unit/session/test_read_gbq_table.py @@ -14,11 +14,9 @@ """Unit tests for read_gbq_table helper functions.""" -import datetime import unittest.mock as mock import google.cloud.bigquery -import google.cloud.bigquery as bigquery import pytest import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table @@ -26,30 +24,6 @@ from .. import resources -def test_get_ibis_time_travel_table_doesnt_timetravel_anonymous_datasets(): - bqsession = resources.create_bigquery_session() - - table_ref = bigquery.TableReference.from_string( - "my-test-project._e8166e0cdb.anonbb92cd" - ) - - table_expression = bf_read_gbq_table.get_ibis_time_travel_table( - bqsession.ibis_client, - table_ref, - index_cols=(), - columns=(), - filters=(), - time_travel_timestamp=datetime.datetime.now(datetime.timezone.utc), - ) - sql = table_expression.compile() - - # Anonymous query results tables don't support time travel. - assert "SYSTEM_TIME" not in sql - - # Need fully-qualified table name. - assert "my-test-project" in sql - - @pytest.mark.parametrize( ("index_cols", "primary_keys", "values_distinct", "expected"), ( diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 4f5daebc87..31029abd67 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -35,6 +35,14 @@ "datasetId": "my_dataset", "tableId": "my_table", } +SCHEMA = { + "fields": [ + {"name": "col1", "type": "INTEGER"}, + {"name": "col2", "type": "INTEGER"}, + {"name": "col3", "type": "INTEGER"}, + {"name": "col4", "type": "INTEGER"}, + ] +} CLUSTERED_OR_PARTITIONED_TABLES = [ pytest.param( google.cloud.bigquery.Table.from_api_repr( @@ -43,6 +51,7 @@ "clustering": { "fields": ["col1", "col2"], }, + "schema": SCHEMA, }, ), id="clustered", @@ -59,6 +68,7 @@ "interval": 1, }, }, + "schema": SCHEMA, }, ), id="range-partitioned", @@ -71,6 +81,7 @@ "type": "MONTH", "field": "col1", }, + "schema": SCHEMA, }, ), id="time-partitioned", @@ -86,6 +97,7 @@ "type": "MONTH", "field": "col1", }, + "schema": SCHEMA, }, ), id="time-partitioned-and-clustered", @@ -163,22 +175,18 @@ def test_read_gbq_cached_table(): google.cloud.bigquery.DatasetReference("my-project", "my_dataset"), "my_table", ) - table = google.cloud.bigquery.Table(table_ref) + table = google.cloud.bigquery.Table( + table_ref, (google.cloud.bigquery.SchemaField("col", "INTEGER"),) + ) + table._properties["location"] = session._location + table._properties["numRows"] = "1000000000" table._properties["location"] = session._location session._df_snapshot[table_ref] = ( datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc), table, ) - def get_table_mock(table_ref): - table = google.cloud.bigquery.Table( - table_ref, (google.cloud.bigquery.SchemaField("col", "INTEGER"),) - ) - table._properties["numRows"] = "1000000000" - table._properties["location"] = session._location - return table - - session.bqclient.get_table = get_table_mock + session.bqclient.get_table.return_value = table session.bqclient.query_and_wait.return_value = ( {"total_count": 3, "distinct_count": 2}, ) @@ -373,7 +381,7 @@ def query_mock(query, *args, **kwargs): return session_query_mock(query, *args, **kwargs) - session.bqclient.query = query_mock + session.bqclient.query_and_wait = query_mock def get_table_mock(table_ref): table = google.cloud.bigquery.Table(