Skip to content

Commit 9e0b383

Browse files
refactor: Simplify local arrow data management (#1591)
1 parent 2ce891f commit 9e0b383

File tree

7 files changed

+65
-72
lines changed

7 files changed

+65
-72
lines changed

bigframes/core/array_value.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@
1616
from dataclasses import dataclass
1717
import datetime
1818
import functools
19-
import io
2019
import typing
2120
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple
2221
import warnings
2322

2423
import google.cloud.bigquery
2524
import pandas
2625
import pyarrow as pa
27-
import pyarrow.feather as pa_feather
2826

2927
import bigframes.core.expression as ex
3028
import bigframes.core.guid
@@ -63,21 +61,16 @@ def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
6361
adapted_table = local_data.adapt_pa_table(arrow_table)
6462
schema = local_data.arrow_schema_to_bigframes(adapted_table.schema)
6563

66-
iobytes = io.BytesIO()
67-
pa_feather.write_feather(adapted_table, iobytes)
68-
# Scan all columns by default, we define this list as it can be pruned while preserving source_def
6964
scan_list = nodes.ScanList(
7065
tuple(
7166
nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column)
7267
for item in schema.items
7368
)
7469
)
75-
70+
data_source = local_data.ManagedArrowTable(adapted_table, schema)
7671
node = nodes.ReadLocalNode(
77-
iobytes.getvalue(),
78-
data_schema=schema,
72+
data_source,
7973
session=session,
80-
n_rows=arrow_table.num_rows,
8174
scan_list=scan_list,
8275
)
8376
return cls(node)

bigframes/core/compile/compiled.py

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@
2626
from bigframes_vendored.ibis.expr.operations import window as ibis_expr_window
2727
import bigframes_vendored.ibis.expr.operations as ibis_ops
2828
import bigframes_vendored.ibis.expr.types as ibis_types
29-
import pandas
29+
from google.cloud import bigquery
30+
import pyarrow as pa
3031

3132
import bigframes.core.compile.aggregate_compiler as agg_compiler
3233
import bigframes.core.compile.googlesql
3334
import bigframes.core.compile.ibis_types
3435
import bigframes.core.compile.scalar_op_compiler as op_compilers
3536
import bigframes.core.compile.scalar_op_compiler as scalar_op_compiler
3637
import bigframes.core.expression as ex
37-
import bigframes.core.guid
3838
from bigframes.core.ordering import OrderingExpression
3939
import bigframes.core.sql
4040
from bigframes.core.window_spec import RangeWindowBounds, RowsWindowBounds, WindowSpec
@@ -279,11 +279,8 @@ def _reproject_to_table(self) -> UnorderedIR:
279279
)
280280

281281
@classmethod
282-
def from_pandas(
283-
cls,
284-
pd_df: pandas.DataFrame,
285-
scan_cols: bigframes.core.nodes.ScanList,
286-
offsets: typing.Optional[str] = None,
282+
def from_polars(
283+
cls, pa_table: pa.Table, schema: Sequence[bigquery.SchemaField]
287284
) -> UnorderedIR:
288285
# TODO: add offsets
289286
"""
@@ -292,37 +289,16 @@ def from_pandas(
292289
Assumed that the dataframe has unique string column names and bigframes-suppported
293290
dtypes.
294291
"""
292+
import bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes
295293

296-
# ibis memtable cannot handle NA, must convert to None
297-
# this destroys the schema however
298-
ibis_values = pd_df.astype("object").where(pandas.notnull(pd_df), None) # type: ignore
299-
if offsets:
300-
ibis_values = ibis_values.assign(**{offsets: range(len(pd_df))})
301294
# derive the ibis schema from the original pandas schema
302-
ibis_schema = [
303-
(
304-
local_label,
305-
bigframes.core.compile.ibis_types.bigframes_dtype_to_ibis_dtype(dtype),
306-
)
307-
for id, dtype, local_label in scan_cols.items
308-
]
309-
if offsets:
310-
ibis_schema.append((offsets, ibis_dtypes.int64))
311-
312295
keys_memtable = bigframes_vendored.ibis.memtable(
313-
ibis_values, schema=bigframes_vendored.ibis.schema(ibis_schema)
296+
pa_table,
297+
schema=third_party_ibis_bqtypes.BigQuerySchema.to_ibis(list(schema)),
314298
)
315-
316-
columns = [
317-
keys_memtable[local_label].name(col_id.sql)
318-
for col_id, _, local_label in scan_cols.items
319-
]
320-
if offsets:
321-
columns.append(keys_memtable[offsets].name(offsets))
322-
323299
return cls(
324300
keys_memtable,
325-
columns=columns,
301+
columns=tuple(keys_memtable[key] for key in keys_memtable.columns),
326302
)
327303

328304
def join(

bigframes/core/compile/compiler.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,20 @@
1414
from __future__ import annotations
1515

1616
import functools
17-
import io
1817
import typing
1918

2019
import bigframes_vendored.ibis.backends.bigquery as ibis_bigquery
2120
import bigframes_vendored.ibis.expr.api as ibis_api
2221
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
2322
import bigframes_vendored.ibis.expr.types as ibis_types
2423
import google.cloud.bigquery
25-
import pandas as pd
24+
import pyarrow as pa
2625

2726
from bigframes import dtypes, operations
28-
from bigframes.core import utils
2927
import bigframes.core.compile.compiled as compiled
3028
import bigframes.core.compile.concat as concat_impl
3129
import bigframes.core.compile.explode
32-
import bigframes.core.compile.ibis_types
3330
import bigframes.core.compile.scalar_op_compiler as compile_scalar
34-
import bigframes.core.compile.schema_translator
3531
import bigframes.core.nodes as nodes
3632
import bigframes.core.ordering as bf_ordering
3733
import bigframes.core.rewrite as rewrites
@@ -161,19 +157,22 @@ def compile_fromrange(
161157

162158
@_compile_node.register
163159
def compile_readlocal(node: nodes.ReadLocalNode, *args):
164-
array_as_pd = pd.read_feather(
165-
io.BytesIO(node.feather_bytes),
166-
columns=[item.source_id for item in node.scan_list.items],
167-
)
168-
169-
# Convert timedeltas to microseconds for compatibility with BigQuery
170-
_ = utils.replace_timedeltas_with_micros(array_as_pd)
171-
172160
offsets = node.offsets_col.sql if node.offsets_col else None
173-
return compiled.UnorderedIR.from_pandas(
174-
array_as_pd, node.scan_list, offsets=offsets
161+
pa_table = node.local_data_source.data
162+
bq_schema = node.schema.to_bigquery()
163+
164+
pa_table = pa_table.select(list(item.source_id for item in node.scan_list.items))
165+
pa_table = pa_table.rename_columns(
166+
{item.source_id: item.id.sql for item in node.scan_list.items}
175167
)
176168

169+
if offsets:
170+
pa_table = pa_table.append_column(
171+
offsets, pa.array(range(pa_table.num_rows), type=pa.int64())
172+
)
173+
bq_schema = (*bq_schema, google.cloud.bigquery.SchemaField(offsets, "INT64"))
174+
return compiled.UnorderedIR.from_polars(pa_table, bq_schema)
175+
177176

178177
@_compile_node.register
179178
def compile_readtable(node: nodes.ReadTableNode, *args):

bigframes/core/compile/polars/compiler.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,10 @@ def compile_readlocal(self, node: nodes.ReadLocalNode):
205205
cols_to_read = {
206206
scan_item.source_id: scan_item.id.sql for scan_item in node.scan_list.items
207207
}
208-
return (
209-
pl.read_ipc(node.feather_bytes, columns=list(cols_to_read.keys()))
210-
.lazy()
211-
.rename(cols_to_read)
212-
)
208+
lazy_frame = cast(
209+
pl.DataFrame, pl.from_arrow(node.local_data_source.data)
210+
).lazy()
211+
return lazy_frame.select(cols_to_read.keys()).rename(cols_to_read)
213212

214213
@compile_node.register
215214
def compile_filter(self, node: nodes.FilterNode):

bigframes/core/local_data.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,37 @@
1616

1717
from __future__ import annotations
1818

19+
import dataclasses
20+
import functools
21+
import uuid
22+
1923
import pyarrow as pa
2024

2125
import bigframes.core.schema as schemata
2226
import bigframes.dtypes
2327

2428

29+
@dataclasses.dataclass(frozen=True)
30+
class LocalTableMetadata:
31+
total_bytes: int
32+
row_count: int
33+
34+
@classmethod
35+
def from_arrow(cls, table: pa.Table):
36+
return cls(total_bytes=table.nbytes, row_count=table.num_rows)
37+
38+
39+
@dataclasses.dataclass(frozen=True)
40+
class ManagedArrowTable:
41+
data: pa.Table = dataclasses.field(hash=False)
42+
schema: schemata.ArraySchema = dataclasses.field(hash=False)
43+
id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4)
44+
45+
@functools.cached_property
46+
def metadata(self):
47+
return LocalTableMetadata.from_arrow(self.data)
48+
49+
2550
def arrow_schema_to_bigframes(arrow_schema: pa.Schema) -> schemata.ArraySchema:
2651
"""Infer the corresponding bigframes schema given a pyarrow schema."""
2752
schema_items = tuple(

bigframes/core/nodes.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,10 @@
2424

2525
import google.cloud.bigquery as bq
2626

27-
from bigframes.core import identifiers
27+
from bigframes.core import identifiers, local_data
2828
from bigframes.core.bigframe_node import BigFrameNode, COLUMN_SET, Field
2929
import bigframes.core.expression as ex
30-
import bigframes.core.guid
3130
from bigframes.core.ordering import OrderingExpression
32-
import bigframes.core.schema as schemata
3331
import bigframes.core.slices as slices
3432
import bigframes.core.window_spec as window
3533
import bigframes.dtypes
@@ -579,11 +577,8 @@ class ScanList:
579577

580578
@dataclasses.dataclass(frozen=True, eq=False)
581579
class ReadLocalNode(LeafNode):
582-
# TODO: Combine feather_bytes, data_schema, n_rows into a LocalDataDef struct
583580
# TODO: Track nullability for local data
584-
feather_bytes: bytes
585-
data_schema: schemata.ArraySchema
586-
n_rows: int
581+
local_data_source: local_data.ManagedArrowTable
587582
# Mapping of local ids to bfet id.
588583
scan_list: ScanList
589584
# Offsets are generated only if this is non-null
@@ -623,7 +618,7 @@ def explicitly_ordered(self) -> bool:
623618

624619
@property
625620
def row_count(self) -> typing.Optional[int]:
626-
return self.n_rows
621+
return self.local_data_source.metadata.row_count
627622

628623
@property
629624
def node_defined_ids(self) -> Tuple[identifiers.ColumnId, ...]:

third_party/bigframes_vendored/ibis/backends/sql/compilers/bigquery/__init__.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from __future__ import annotations
55

6+
import datetime
67
import decimal
78
import math
89
import re
@@ -478,6 +479,11 @@ def visit_NonNullLiteral(self, op, *, value, dtype):
478479
return sge.convert(str(value))
479480

480481
elif dtype.is_int64():
482+
# allows directly using values out of a duration arrow array
483+
if isinstance(value, datetime.timedelta):
484+
value = (
485+
(value.days * 3600 * 24) + value.seconds
486+
) * 1_000_000 + value.microseconds
481487
return sge.convert(np.int64(value))
482488
return None
483489

@@ -1024,7 +1030,7 @@ def visit_InMemoryTable(self, op, *, name, schema, data):
10241030
# Avoid creating temp tables for small data, which is how memtable is
10251031
# used in BigQuery DataFrames. Inspired by:
10261032
# https://github.com/ibis-project/ibis/blob/efa6fb72bf4c790450d00a926d7bd809dade5902/ibis/backends/druid/compiler.py#L95
1027-
tuples = data.to_frame().itertuples(index=False)
1033+
rows = data.to_pyarrow(schema=None).to_pylist() # type: ignore
10281034
quoted = self.quoted
10291035
columns = [sg.column(col, quoted=quoted) for col in schema.names]
10301036
array_expr = sge.DataType(
@@ -1042,10 +1048,10 @@ def visit_InMemoryTable(self, op, *, name, schema, data):
10421048
sge.Struct(
10431049
expressions=tuple(
10441050
self.visit_Literal(None, value=value, dtype=type_)
1045-
for value, type_ in zip(row, schema.types)
1051+
for value, type_ in zip(row.values(), schema.types)
10461052
)
10471053
)
1048-
for row in tuples
1054+
for row in rows
10491055
]
10501056
expr = sge.Unnest(
10511057
expressions=[

0 commit comments

Comments
 (0)