Skip to content

refactor: New read node that defers ibis table instantiation #709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
from __future__ import annotations

from dataclasses import dataclass
import datetime
import functools
import io
import itertools
import typing
from typing import Iterable, Sequence
from typing import Iterable, Optional, Sequence
import warnings

import google.cloud.bigquery
import ibis.expr.types as ibis_types
import pandas
import pyarrow as pa
Expand Down Expand Up @@ -92,6 +95,35 @@ def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
)
return cls(node)

@classmethod
def from_table(
cls,
table: google.cloud.bigquery.Table,
schema: schemata.ArraySchema,
session: Session,
*,
predicate: Optional[str] = None,
at_time: Optional[datetime.datetime] = None,
primary_key: Sequence[str] = (),
):
if any(i.field_type == "JSON" for i in table.schema if i.name in schema.names):
warnings.warn(
"Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.",
bigframes.exceptions.PreviewWarning,
)
node = nodes.ReadTableNode(
project_id=table.reference.project,
dataset_id=table.reference.dataset_id,
table_id=table.reference.table_id,
physical_schema=tuple(table.schema),
total_order_cols=tuple(primary_key),
columns=schema,
at_time=at_time,
table_session=session,
sql_predicate=predicate,
)
return cls(node)

@property
def column_ids(self) -> typing.Sequence[str]:
return self.schema.names
Expand Down
89 changes: 89 additions & 0 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@
import io
import typing

import ibis
import ibis.backends
import ibis.backends.bigquery
import ibis.expr.types
import pandas as pd

import bigframes.core.compile.compiled as compiled
import bigframes.core.compile.concat as concat_impl
import bigframes.core.compile.default_ordering as default_ordering
import bigframes.core.compile.schema_translator
import bigframes.core.compile.single_column
import bigframes.core.nodes as nodes
import bigframes.core.ordering as bf_ordering
import bigframes.dtypes as bigframes_dtypes

if typing.TYPE_CHECKING:
import bigframes.core
Expand Down Expand Up @@ -88,6 +96,87 @@ def compile_readlocal(node: nodes.ReadLocalNode, ordered: bool = True):
return ordered_ir.to_unordered()


@_compile_node.register
def compile_readtable(node: nodes.ReadTableNode, ordered: bool = True):
if ordered:
return compile_read_table_ordered(node)
else:
return compile_read_table_unordered(node)


def read_table_as_unordered_ibis(node: nodes.ReadTableNode) -> ibis.expr.types.Table:
full_table_name = f"{node.project_id}.{node.dataset_id}.{node.table_id}"
used_columns = (
*node.schema.names,
*[i for i in node.total_order_cols if i not in node.schema.names],
)
# Physical schema might include unused columns, unsupported datatypes like JSON
physical_schema = ibis.backends.bigquery.BigQuerySchema.to_ibis(
list(i for i in node.physical_schema if i.name in used_columns)
)
if node.at_time is not None or node.sql_predicate is not None:
import bigframes.session._io.bigquery

sql = bigframes.session._io.bigquery.to_query(
full_table_name,
columns=used_columns,
sql_predicate=node.sql_predicate,
time_travel_timestamp=node.at_time,
)
return ibis.backends.bigquery.Backend().sql(schema=physical_schema, query=sql)
else:
return ibis.table(physical_schema, full_table_name)


def compile_read_table_unordered(node: nodes.ReadTableNode):
ibis_table = read_table_as_unordered_ibis(node)
return compiled.UnorderedIR(
ibis_table,
tuple(
bigframes_dtypes.ibis_value_to_canonical_type(ibis_table[col])
for col in node.schema.names
),
)


def compile_read_table_ordered(node: nodes.ReadTableNode):
ibis_table = read_table_as_unordered_ibis(node)
if node.total_order_cols:
ordering_value_columns = tuple(
bf_ordering.ascending_over(col) for col in node.total_order_cols
)
if node.order_col_is_sequential:
integer_encoding = bf_ordering.IntegerEncoding(
is_encoded=True, is_sequential=True
)
else:
integer_encoding = bf_ordering.IntegerEncoding()
ordering = bf_ordering.ExpressionOrdering(
ordering_value_columns,
integer_encoding=integer_encoding,
total_ordering_columns=frozenset(node.total_order_cols),
)
hidden_columns = ()
else:
ibis_table, ordering = default_ordering.gen_default_ordering(
ibis_table, use_double_hash=True
)
hidden_columns = tuple(
ibis_table[col]
for col in ibis_table.columns
if col not in node.schema.names
)
return compiled.OrderedIR(
ibis_table,
columns=tuple(
bigframes_dtypes.ibis_value_to_canonical_type(ibis_table[col])
for col in node.schema.names
),
ordering=ordering,
hidden_ordering_columns=hidden_columns,
)


@_compile_node.register
def compile_readgbq(node: nodes.ReadGbqNode, ordered: bool = True):
if ordered:
Expand Down
91 changes: 91 additions & 0 deletions bigframes/core/compile/default_ordering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Private helpers for loading a BigQuery table as a BigQuery DataFrames DataFrame.
"""

from __future__ import annotations

import itertools
from typing import cast

import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops
import ibis
import ibis.backends
import ibis.expr.datatypes as ibis_dtypes
import ibis.expr.types as ibis_types

import bigframes.core.guid as guid
import bigframes.core.ordering as order


def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringValue:
col_type = column.type()
if (
col_type.is_numeric()
or col_type.is_boolean()
or col_type.is_binary()
or col_type.is_temporal()
):
result = column.cast(ibis_dtypes.String(nullable=True))
elif col_type.is_geospatial():
result = cast(ibis_types.GeoSpatialColumn, column).as_text()
elif col_type.is_string():
result = column
else:
# TO_JSON_STRING works with all data types, but isn't the most efficient
# Needed for JSON, STRUCT and ARRAY datatypes
result = vendored_ibis_ops.ToJsonString(column).to_expr() # type: ignore
# Escape backslashes and use backslash as delineator
escaped = cast(ibis_types.StringColumn, result.fillna("")).replace("\\", "\\\\") # type: ignore
return cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped)


def gen_default_ordering(table: ibis.table, use_double_hash: bool = True):
ordering_hash_part = guid.generate_guid("bigframes_ordering_")
ordering_hash_part2 = guid.generate_guid("bigframes_ordering_")
ordering_rand_part = guid.generate_guid("bigframes_ordering_")

# All inputs into hash must be non-null or resulting hash will be null
str_values = list(
map(lambda col: _convert_to_nonnull_string(table[col]), table.columns)
)
full_row_str = (
str_values[0].concat(*str_values[1:]) if len(str_values) > 1 else str_values[0]
)
full_row_hash = full_row_str.hash().name(ordering_hash_part)
# By modifying value slightly, we get another hash uncorrelated with the first
full_row_hash_p2 = (full_row_str + "_").hash().name(ordering_hash_part2)
# Used to disambiguate between identical rows (which will have identical hash)
random_value = ibis.random().name(ordering_rand_part)

order_values = (
[full_row_hash, full_row_hash_p2, random_value]
if use_double_hash
else [full_row_hash, random_value]
)

original_column_ids = table.columns
table_with_ordering = table.select(
itertools.chain(original_column_ids, order_values)
)

ordering = order.ExpressionOrdering(
ordering_value_columns=tuple(
order.ascending_over(col.get_name()) for col in order_values
),
total_ordering_columns=frozenset(col.get_name() for col in order_values),
)
return table_with_ordering, ordering
34 changes: 34 additions & 0 deletions bigframes/core/compile/schema_translator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations

import ibis
import ibis.expr.schema

import bigframes.core.schema as bf_schema
import bigframes.dtypes


def convert_bf_schema(schema: bf_schema.ArraySchema) -> ibis.expr.schema.Schema:
"""
Convert bigframes schema to ibis schema. This is unambigous as every bigframes type is backed by a specific SQL/ibis dtype.
"""
names = schema.names
types = [
bigframes.dtypes.bigframes_dtype_to_ibis_dtype(bf_type)
for bf_type in schema.dtypes
]
return ibis.schema(names=names, types=types)
64 changes: 64 additions & 0 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

import abc
from dataclasses import dataclass, field, fields, replace
import datetime
import functools
import itertools
import typing
from typing import Callable, Tuple

import google.cloud.bigquery as bq

import bigframes.core.expression as ex
import bigframes.core.guid
from bigframes.core.join_def import JoinColumnMapping, JoinDefinition, JoinSide
Expand Down Expand Up @@ -345,6 +348,67 @@ def transform_children(
return self


## Put ordering in here or just add order_by node above?
@dataclass(frozen=True)
class ReadTableNode(BigFrameNode):
project_id: str = field()
dataset_id: str = field()
table_id: str = field()

physical_schema: Tuple[bq.SchemaField, ...] = field()
# Subset of physical schema columns, with chosen BQ types
columns: schemata.ArraySchema = field()

table_session: bigframes.session.Session = field()
# Empty tuple if no primary key (primary key can be any set of columns that together form a unique key)
# Empty if no known unique key
total_order_cols: Tuple[str, ...] = field()
# indicates a primary key that is exactly offsets 0, 1, 2, ..., N-2, N-1
order_col_is_sequential: bool = False
at_time: typing.Optional[datetime.datetime] = None
# Added for backwards compatibility, not validated
sql_predicate: typing.Optional[str] = None
Comment on lines +369 to +370
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fascinating. This implies some level of SQL compilation outside of this node. Should this be a structured "filters" object, instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original filters type is a bit too flexible, allowing potentially non-hashable tuples. I could convert the whole thing to tuples I guess. Would there be a benefit to that approach?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Forcing compilation to a string doesn't seem like the right choice to me. Some namedtuple or frozen dataclass would make the most sense to me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If eq and frozen are both true, by default @DataClass will generate a hash() method for you.

https://docs.python.org/3/library/dataclasses.html#dataclasses.dataclass


def __post_init__(self):
# enforce invariants
physical_names = set(map(lambda i: i.name, self.physical_schema))
if not set(self.columns.names).issubset(physical_names):
raise ValueError(
f"Requested schema {self.columns} cannot be derived from table schemal {self.physical_schema}"
)
if self.order_col_is_sequential and len(self.total_order_cols) == 1:
raise ValueError("Sequential primary key must have only one component")

@property
def session(self):
return self.table_session

def __hash__(self):
return self._node_hash

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}

@property
def schema(self) -> schemata.ArraySchema:
return self.columns

@property
def relation_ops_created(self) -> int:
# Assume worst case, where readgbq actually has baked in analytic operation to generate index
return 3

@functools.cached_property
def variables_introduced(self) -> int:
return len(self.schema.items) + 1

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return self


# Unary nodes
@dataclass(frozen=True)
class PromoteOffsetsNode(UnaryNode):
Expand Down
Loading