Skip to content

Commit 9f0406e

Browse files
refactor: New read node that defers ibis table instantiation (#709)
1 parent 968d825 commit 9f0406e

13 files changed

+564
-316
lines changed

bigframes/core/__init__.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414
from __future__ import annotations
1515

1616
from dataclasses import dataclass
17+
import datetime
1718
import functools
1819
import io
1920
import itertools
2021
import typing
21-
from typing import Iterable, Sequence
22+
from typing import Iterable, Optional, Sequence
23+
import warnings
2224

25+
import google.cloud.bigquery
2326
import ibis.expr.types as ibis_types
2427
import pandas
2528
import pyarrow as pa
@@ -92,6 +95,35 @@ def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
9295
)
9396
return cls(node)
9497

98+
@classmethod
99+
def from_table(
100+
cls,
101+
table: google.cloud.bigquery.Table,
102+
schema: schemata.ArraySchema,
103+
session: Session,
104+
*,
105+
predicate: Optional[str] = None,
106+
at_time: Optional[datetime.datetime] = None,
107+
primary_key: Sequence[str] = (),
108+
):
109+
if any(i.field_type == "JSON" for i in table.schema if i.name in schema.names):
110+
warnings.warn(
111+
"Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.",
112+
bigframes.exceptions.PreviewWarning,
113+
)
114+
node = nodes.ReadTableNode(
115+
project_id=table.reference.project,
116+
dataset_id=table.reference.dataset_id,
117+
table_id=table.reference.table_id,
118+
physical_schema=tuple(table.schema),
119+
total_order_cols=tuple(primary_key),
120+
columns=schema,
121+
at_time=at_time,
122+
table_session=session,
123+
sql_predicate=predicate,
124+
)
125+
return cls(node)
126+
95127
@property
96128
def column_ids(self) -> typing.Sequence[str]:
97129
return self.schema.names

bigframes/core/compile/compiler.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,20 @@
1717
import io
1818
import typing
1919

20+
import ibis
21+
import ibis.backends
22+
import ibis.backends.bigquery
23+
import ibis.expr.types
2024
import pandas as pd
2125

2226
import bigframes.core.compile.compiled as compiled
2327
import bigframes.core.compile.concat as concat_impl
28+
import bigframes.core.compile.default_ordering as default_ordering
29+
import bigframes.core.compile.schema_translator
2430
import bigframes.core.compile.single_column
2531
import bigframes.core.nodes as nodes
32+
import bigframes.core.ordering as bf_ordering
33+
import bigframes.dtypes as bigframes_dtypes
2634

2735
if typing.TYPE_CHECKING:
2836
import bigframes.core
@@ -88,6 +96,87 @@ def compile_readlocal(node: nodes.ReadLocalNode, ordered: bool = True):
8896
return ordered_ir.to_unordered()
8997

9098

99+
@_compile_node.register
100+
def compile_readtable(node: nodes.ReadTableNode, ordered: bool = True):
101+
if ordered:
102+
return compile_read_table_ordered(node)
103+
else:
104+
return compile_read_table_unordered(node)
105+
106+
107+
def read_table_as_unordered_ibis(node: nodes.ReadTableNode) -> ibis.expr.types.Table:
108+
full_table_name = f"{node.project_id}.{node.dataset_id}.{node.table_id}"
109+
used_columns = (
110+
*node.schema.names,
111+
*[i for i in node.total_order_cols if i not in node.schema.names],
112+
)
113+
# Physical schema might include unused columns, unsupported datatypes like JSON
114+
physical_schema = ibis.backends.bigquery.BigQuerySchema.to_ibis(
115+
list(i for i in node.physical_schema if i.name in used_columns)
116+
)
117+
if node.at_time is not None or node.sql_predicate is not None:
118+
import bigframes.session._io.bigquery
119+
120+
sql = bigframes.session._io.bigquery.to_query(
121+
full_table_name,
122+
columns=used_columns,
123+
sql_predicate=node.sql_predicate,
124+
time_travel_timestamp=node.at_time,
125+
)
126+
return ibis.backends.bigquery.Backend().sql(schema=physical_schema, query=sql)
127+
else:
128+
return ibis.table(physical_schema, full_table_name)
129+
130+
131+
def compile_read_table_unordered(node: nodes.ReadTableNode):
132+
ibis_table = read_table_as_unordered_ibis(node)
133+
return compiled.UnorderedIR(
134+
ibis_table,
135+
tuple(
136+
bigframes_dtypes.ibis_value_to_canonical_type(ibis_table[col])
137+
for col in node.schema.names
138+
),
139+
)
140+
141+
142+
def compile_read_table_ordered(node: nodes.ReadTableNode):
143+
ibis_table = read_table_as_unordered_ibis(node)
144+
if node.total_order_cols:
145+
ordering_value_columns = tuple(
146+
bf_ordering.ascending_over(col) for col in node.total_order_cols
147+
)
148+
if node.order_col_is_sequential:
149+
integer_encoding = bf_ordering.IntegerEncoding(
150+
is_encoded=True, is_sequential=True
151+
)
152+
else:
153+
integer_encoding = bf_ordering.IntegerEncoding()
154+
ordering = bf_ordering.ExpressionOrdering(
155+
ordering_value_columns,
156+
integer_encoding=integer_encoding,
157+
total_ordering_columns=frozenset(node.total_order_cols),
158+
)
159+
hidden_columns = ()
160+
else:
161+
ibis_table, ordering = default_ordering.gen_default_ordering(
162+
ibis_table, use_double_hash=True
163+
)
164+
hidden_columns = tuple(
165+
ibis_table[col]
166+
for col in ibis_table.columns
167+
if col not in node.schema.names
168+
)
169+
return compiled.OrderedIR(
170+
ibis_table,
171+
columns=tuple(
172+
bigframes_dtypes.ibis_value_to_canonical_type(ibis_table[col])
173+
for col in node.schema.names
174+
),
175+
ordering=ordering,
176+
hidden_ordering_columns=hidden_columns,
177+
)
178+
179+
91180
@_compile_node.register
92181
def compile_readgbq(node: nodes.ReadGbqNode, ordered: bool = True):
93182
if ordered:
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Private helpers for loading a BigQuery table as a BigQuery DataFrames DataFrame.
17+
"""
18+
19+
from __future__ import annotations
20+
21+
import itertools
22+
from typing import cast
23+
24+
import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops
25+
import ibis
26+
import ibis.backends
27+
import ibis.expr.datatypes as ibis_dtypes
28+
import ibis.expr.types as ibis_types
29+
30+
import bigframes.core.guid as guid
31+
import bigframes.core.ordering as order
32+
33+
34+
def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringValue:
35+
col_type = column.type()
36+
if (
37+
col_type.is_numeric()
38+
or col_type.is_boolean()
39+
or col_type.is_binary()
40+
or col_type.is_temporal()
41+
):
42+
result = column.cast(ibis_dtypes.String(nullable=True))
43+
elif col_type.is_geospatial():
44+
result = cast(ibis_types.GeoSpatialColumn, column).as_text()
45+
elif col_type.is_string():
46+
result = column
47+
else:
48+
# TO_JSON_STRING works with all data types, but isn't the most efficient
49+
# Needed for JSON, STRUCT and ARRAY datatypes
50+
result = vendored_ibis_ops.ToJsonString(column).to_expr() # type: ignore
51+
# Escape backslashes and use backslash as delineator
52+
escaped = cast(ibis_types.StringColumn, result.fillna("")).replace("\\", "\\\\") # type: ignore
53+
return cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped)
54+
55+
56+
def gen_default_ordering(table: ibis.table, use_double_hash: bool = True):
57+
ordering_hash_part = guid.generate_guid("bigframes_ordering_")
58+
ordering_hash_part2 = guid.generate_guid("bigframes_ordering_")
59+
ordering_rand_part = guid.generate_guid("bigframes_ordering_")
60+
61+
# All inputs into hash must be non-null or resulting hash will be null
62+
str_values = list(
63+
map(lambda col: _convert_to_nonnull_string(table[col]), table.columns)
64+
)
65+
full_row_str = (
66+
str_values[0].concat(*str_values[1:]) if len(str_values) > 1 else str_values[0]
67+
)
68+
full_row_hash = full_row_str.hash().name(ordering_hash_part)
69+
# By modifying value slightly, we get another hash uncorrelated with the first
70+
full_row_hash_p2 = (full_row_str + "_").hash().name(ordering_hash_part2)
71+
# Used to disambiguate between identical rows (which will have identical hash)
72+
random_value = ibis.random().name(ordering_rand_part)
73+
74+
order_values = (
75+
[full_row_hash, full_row_hash_p2, random_value]
76+
if use_double_hash
77+
else [full_row_hash, random_value]
78+
)
79+
80+
original_column_ids = table.columns
81+
table_with_ordering = table.select(
82+
itertools.chain(original_column_ids, order_values)
83+
)
84+
85+
ordering = order.ExpressionOrdering(
86+
ordering_value_columns=tuple(
87+
order.ascending_over(col.get_name()) for col in order_values
88+
),
89+
total_ordering_columns=frozenset(col.get_name() for col in order_values),
90+
)
91+
return table_with_ordering, ordering
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from __future__ import annotations
17+
18+
import ibis
19+
import ibis.expr.schema
20+
21+
import bigframes.core.schema as bf_schema
22+
import bigframes.dtypes
23+
24+
25+
def convert_bf_schema(schema: bf_schema.ArraySchema) -> ibis.expr.schema.Schema:
26+
"""
27+
Convert bigframes schema to ibis schema. This is unambigous as every bigframes type is backed by a specific SQL/ibis dtype.
28+
"""
29+
names = schema.names
30+
types = [
31+
bigframes.dtypes.bigframes_dtype_to_ibis_dtype(bf_type)
32+
for bf_type in schema.dtypes
33+
]
34+
return ibis.schema(names=names, types=types)

bigframes/core/nodes.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
import abc
1818
from dataclasses import dataclass, field, fields, replace
19+
import datetime
1920
import functools
2021
import itertools
2122
import typing
2223
from typing import Callable, Tuple
2324

25+
import google.cloud.bigquery as bq
26+
2427
import bigframes.core.expression as ex
2528
import bigframes.core.guid
2629
from bigframes.core.join_def import JoinColumnMapping, JoinDefinition, JoinSide
@@ -345,6 +348,67 @@ def transform_children(
345348
return self
346349

347350

351+
## Put ordering in here or just add order_by node above?
352+
@dataclass(frozen=True)
353+
class ReadTableNode(BigFrameNode):
354+
project_id: str = field()
355+
dataset_id: str = field()
356+
table_id: str = field()
357+
358+
physical_schema: Tuple[bq.SchemaField, ...] = field()
359+
# Subset of physical schema columns, with chosen BQ types
360+
columns: schemata.ArraySchema = field()
361+
362+
table_session: bigframes.session.Session = field()
363+
# Empty tuple if no primary key (primary key can be any set of columns that together form a unique key)
364+
# Empty if no known unique key
365+
total_order_cols: Tuple[str, ...] = field()
366+
# indicates a primary key that is exactly offsets 0, 1, 2, ..., N-2, N-1
367+
order_col_is_sequential: bool = False
368+
at_time: typing.Optional[datetime.datetime] = None
369+
# Added for backwards compatibility, not validated
370+
sql_predicate: typing.Optional[str] = None
371+
372+
def __post_init__(self):
373+
# enforce invariants
374+
physical_names = set(map(lambda i: i.name, self.physical_schema))
375+
if not set(self.columns.names).issubset(physical_names):
376+
raise ValueError(
377+
f"Requested schema {self.columns} cannot be derived from table schemal {self.physical_schema}"
378+
)
379+
if self.order_col_is_sequential and len(self.total_order_cols) == 1:
380+
raise ValueError("Sequential primary key must have only one component")
381+
382+
@property
383+
def session(self):
384+
return self.table_session
385+
386+
def __hash__(self):
387+
return self._node_hash
388+
389+
@property
390+
def roots(self) -> typing.Set[BigFrameNode]:
391+
return {self}
392+
393+
@property
394+
def schema(self) -> schemata.ArraySchema:
395+
return self.columns
396+
397+
@property
398+
def relation_ops_created(self) -> int:
399+
# Assume worst case, where readgbq actually has baked in analytic operation to generate index
400+
return 3
401+
402+
@functools.cached_property
403+
def variables_introduced(self) -> int:
404+
return len(self.schema.items) + 1
405+
406+
def transform_children(
407+
self, t: Callable[[BigFrameNode], BigFrameNode]
408+
) -> BigFrameNode:
409+
return self
410+
411+
348412
# Unary nodes
349413
@dataclass(frozen=True)
350414
class PromoteOffsetsNode(UnaryNode):

0 commit comments

Comments
 (0)