Skip to content

Commit 475eeea

Browse files
committed
feat: (Series|DataFrame).explode
1 parent 9d8cf67 commit 475eeea

File tree

12 files changed

+195
-5
lines changed

12 files changed

+195
-5
lines changed

bigframes/core/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,19 @@ def join(
401401
return ArrayValue(bigframes.core.rewrite.maybe_rewrite_join(join_node))
402402
return ArrayValue(join_node)
403403

404+
def explode(self, column_ids: typing.Sequence[str]) -> ArrayValue:
405+
column_ids = [
406+
column_id
407+
for column_id in column_ids
408+
if bigframes.dtypes.is_array_like(self.get_column_type(column_id))
409+
]
410+
if len(column_ids) == 0:
411+
return ArrayValue(self.node)
412+
else:
413+
return ArrayValue(
414+
nodes.ExplodeNode(child=self.node, column_ids=tuple(column_ids))
415+
)
416+
404417
def _uniform_sampling(self, fraction: float) -> ArrayValue:
405418
"""Sampling the table on given fraction.
406419

bigframes/core/blocks.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,30 @@ def calculate_pairwise_metric(self, op=agg_ops.CorrOp()):
11791179
index_labels=self.column_labels.names,
11801180
)
11811181

1182+
def explode(
1183+
self,
1184+
column_ids: typing.Sequence[str],
1185+
ignore_index: Optional[bool],
1186+
) -> Block:
1187+
expr = self.expr.explode(column_ids)
1188+
# TODO: check multi-index
1189+
# TODO: check ignore_index works if column_ids is empty.
1190+
if ignore_index:
1191+
new_index_col_id = guid.generate_guid("explode_index_")
1192+
expr = expr.promote_offsets(new_index_col_id)
1193+
expr = expr.drop_columns(self.index_columns)
1194+
index_columns = [new_index_col_id]
1195+
index_labels = [None]
1196+
else:
1197+
index_columns = list(self.index_columns)
1198+
index_labels = self.column_labels.names
1199+
return Block(
1200+
expr,
1201+
column_labels=self.column_labels,
1202+
index_columns=index_columns,
1203+
index_labels=index_labels,
1204+
)
1205+
11821206
def _standard_stats(self, column_id) -> typing.Sequence[agg_ops.UnaryAggregateOp]:
11831207
"""
11841208
Gets a standard set of stats to preemptively fetch for a column if

bigframes/core/compile/compiled.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import typing
2121
from typing import Collection, Iterable, Literal, Optional, Sequence
2222

23+
import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops
2324
import ibis
2425
import ibis.backends.bigquery as ibis_bigquery
2526
import ibis.common.deferred # type: ignore
@@ -502,6 +503,16 @@ def _uniform_sampling(self, fraction: float) -> UnorderedIR:
502503
columns=columns,
503504
)
504505

506+
def explode(self, column_ids: typing.Sequence[str]) -> UnorderedIR:
507+
"""TODO"""
508+
# TODO: HERE
509+
table = self._to_ibis_expr()
510+
columns = [table[column_name] for column_name in self._column_names]
511+
return UnorderedIR(
512+
table,
513+
columns=columns,
514+
)
515+
505516
## Helpers
506517
def _set_or_replace_by_id(
507518
self, id: str, new_value: ibis_types.Value
@@ -719,6 +730,89 @@ def _uniform_sampling(self, fraction: float) -> OrderedIR:
719730
ordering=self._ordering,
720731
)
721732

733+
def explode(self, column_ids: typing.Sequence[str]) -> OrderedIR:
734+
"""TODO"""
735+
table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True)
736+
737+
offset_array_id = bigframes.core.guid.generate_guid("offset_array_")
738+
offset_array = (
739+
# TODO: check when all columns are empty.
740+
vendored_ibis_ops.GenerateArray(
741+
ibis.greatest(
742+
0,
743+
ibis.least(
744+
*[table[column_id].length() - 1 for column_id in column_ids]
745+
),
746+
)
747+
)
748+
.to_expr()
749+
.name(offset_array_id),
750+
)
751+
table_w_offset = table.select(
752+
offset_array,
753+
*self._column_names,
754+
*self._hidden_ordering_column_names,
755+
)
756+
757+
# TODO: file ibis bug, when column name is `array`
758+
zip_array_id = bigframes.core.guid.generate_guid("zip_array_")
759+
zip_array = (
760+
table_w_offset[offset_array_id]
761+
.zip(*[table_w_offset[column_id] for column_id in column_ids])
762+
.name(zip_array_id)
763+
)
764+
table_w_zip_array = table_w_offset.select(
765+
zip_array,
766+
*self._column_names,
767+
*self._hidden_ordering_column_names,
768+
)
769+
770+
unnest_array_id = bigframes.core.guid.generate_guid("unnest_array_")
771+
unnest_offset_id = bigframes.core.guid.generate_guid("unnest_offset_")
772+
773+
unnest_array = table_w_zip_array[zip_array_id].unnest().name(unnest_array_id)
774+
unnested_columns = [
775+
unnest_array[f"f{index+2}"].name(column_id)
776+
for index, column_id in zip(range(len(column_ids)), column_ids)
777+
]
778+
other_columns = [
779+
column_id for column_id in self._column_names if column_id not in column_ids
780+
]
781+
table_w_unnest = table_w_zip_array.select(
782+
unnest_array["f1"].name(unnest_offset_id),
783+
*unnested_columns,
784+
*other_columns,
785+
*self._hidden_ordering_column_names,
786+
)
787+
# print(ibis.to_sql(table_w_unnest))
788+
789+
columns = [table_w_unnest[column_name] for column_name in self._column_names]
790+
hidden_ordering_columns = [
791+
*[
792+
table_w_unnest[column_name]
793+
for column_name in self._hidden_ordering_column_names
794+
],
795+
table_w_unnest[unnest_offset_id],
796+
]
797+
ordering = ExpressionOrdering(
798+
ordering_value_columns=tuple(
799+
[
800+
*self._ordering.ordering_value_columns,
801+
ascending_over(unnest_offset_id),
802+
]
803+
),
804+
total_ordering_columns=frozenset(
805+
[*self._ordering.total_ordering_columns, unnest_offset_id]
806+
),
807+
)
808+
809+
return OrderedIR(
810+
table_w_unnest,
811+
columns=columns,
812+
hidden_ordering_columns=hidden_ordering_columns,
813+
ordering=ordering,
814+
)
815+
722816
def promote_offsets(self, col_id: str) -> OrderedIR:
723817
"""
724818
Convenience function to promote copy of column offsets to a value column. Can be used to reset index.

bigframes/core/compile/compiler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@ def compile_unpivot(node: nodes.UnpivotNode, ordered: bool = True):
191191
)
192192

193193

194+
@_compile_node.register
195+
def compiler_explode(node: nodes.ExplodeNode, ordered: bool = True):
196+
return compile_node(node.child, ordered).explode(node.column_ids)
197+
198+
194199
@_compile_node.register
195200
def compiler_random_sample(node: nodes.RandomSampleNode, ordered: bool = True):
196201
return compile_node(node.child, ordered)._uniform_sampling(node.fraction)

bigframes/core/nodes.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,3 +484,11 @@ def row_preserving(self) -> bool:
484484

485485
def __hash__(self):
486486
return self._node_hash
487+
488+
489+
@dataclass(frozen=True)
490+
class ExplodeNode(UnaryNode):
491+
column_ids: typing.Tuple[str, ...]
492+
493+
def __hash__(self):
494+
return self._node_hash

bigframes/dataframe.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2518,6 +2518,21 @@ def sample(
25182518
)[0]
25192519
)
25202520

2521+
def explode(
2522+
self,
2523+
column: str | typing.Sequence[str],
2524+
*,
2525+
ignore_index: Optional[bool] = False,
2526+
) -> DataFrame:
2527+
columns = list(column) if utils.is_list_like(column) else [column]
2528+
if not columns:
2529+
raise ValueError("column must be nonempty")
2530+
if len(column) > len(set(column)):
2531+
raise ValueError("column must be unique")
2532+
return DataFrame(
2533+
self._block.explode(column_ids=columns, ignore_index=ignore_index)
2534+
)
2535+
25212536
def _split(
25222537
self,
25232538
ns: Iterable[int] = (),

bigframes/dtypes.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,19 @@ def is_string_like(type: ExpressionType) -> bool:
129129

130130

131131
def is_array_like(type: ExpressionType) -> bool:
132-
if isinstance(type, pd.ArrowDtype) and isinstance(type.pyarrow_dtype, pa.ListType):
133-
return True
134-
else:
135-
return type in (STRING_DTYPE, BYTES_DTYPE)
132+
return isinstance(type, pd.ArrowDtype) and isinstance(
133+
type.pyarrow_dtype, pa.ListType
134+
)
136135

137136

138137
def is_numeric(type: ExpressionType) -> bool:
139138
return type in NUMERIC_BIGFRAMES_TYPES_PERMISSIVE
140139

141140

141+
def is_iterable(type: ExpressionType) -> bool:
142+
return type in (STRING_DTYPE, BYTES_DTYPE) or is_array_like(type)
143+
144+
142145
def is_comparable(type: ExpressionType) -> bool:
143146
return (type is not None) and (type not in UNORDERED_DTYPES)
144147

bigframes/operations/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ def create_binary_op(
212212
len_op = create_unary_op(
213213
name="len",
214214
type_signature=op_typing.FixedOutputType(
215-
dtypes.is_array_like, dtypes.INT_DTYPE, description="array-like"
215+
dtypes.is_iterable, dtypes.INT_DTYPE, description="iterable"
216216
),
217217
)
218218
reverse_op = create_unary_op(name="reverse", type_signature=op_typing.STRING_TRANSFORM)

bigframes/series.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,6 +1546,17 @@ def sample(
15461546
)[0]
15471547
)
15481548

1549+
def explode(
1550+
self,
1551+
*,
1552+
ignore_index: Optional[bool] = False,
1553+
) -> Series:
1554+
return Series(
1555+
self._block.explode(
1556+
column_ids=[self._value_column], ignore_index=ignore_index
1557+
)
1558+
)
1559+
15491560
def __array_ufunc__(
15501561
self, ufunc: numpy.ufunc, method: str, *inputs, **kwargs
15511562
) -> Series:

third_party/bigframes_vendored/ibis/backends/bigquery/registry.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,17 @@ def _to_json_string(translator, op: vendored_ibis_ops.ToJsonString):
2626
return f"TO_JSON_STRING({arg})"
2727

2828

29+
def _generate_array(translator, op: vendored_ibis_ops.GenerateArray):
30+
arg = translator.translate(op.arg)
31+
return f"GENERATE_ARRAY(0, {arg})"
32+
33+
2934
patched_ops = {
3035
vendored_ibis_ops.ApproximateMultiQuantile: _approx_quantiles, # type:ignore
3136
vendored_ibis_ops.FirstNonNullValue: _first_non_null_value, # type:ignore
3237
vendored_ibis_ops.LastNonNullValue: _last_non_null_value, # type:ignore
3338
vendored_ibis_ops.ToJsonString: _to_json_string, # type:ignore
39+
vendored_ibis_ops.GenerateArray: _generate_array, # type:ignore
3440
}
3541

3642
OPERATION_REGISTRY.update(patched_ops)

third_party/bigframes_vendored/ibis/expr/operations/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22
from __future__ import annotations
33

44
from bigframes_vendored.ibis.expr.operations.analytic import * # noqa: F401 F403
5+
from bigframes_vendored.ibis.expr.operations.generic import * # noqa: F401 F403
56
from bigframes_vendored.ibis.expr.operations.json import * # noqa: F401 F403
67
from bigframes_vendored.ibis.expr.operations.reductions import * # noqa: F401 F403
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Contains code from https://github.com/ibis-project/ibis/blob/master/ibis/expr/operations/generic.py
2+
from __future__ import annotations
3+
4+
import ibis.expr.datatypes as dt
5+
from ibis.expr.operations.core import Unary
6+
7+
8+
# TODO: add this function to ibis
9+
class GenerateArray(Unary):
10+
dtype = dt.Array(dt.int64)

0 commit comments

Comments
 (0)