Skip to content

perf: Optimize dataframe-series alignment on axis=1 #732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 227 additions & 13 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,13 @@ def __init__(
self._transpose_cache: Optional[Block] = transpose_cache

@classmethod
def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block:
def from_local(
cls,
data: pd.DataFrame,
session: bigframes.Session,
*,
cache_transpose: bool = True,
) -> Block:
# Assumes caller has already converted datatypes to bigframes ones.
pd_data = data
column_labels = pd_data.columns
Expand All @@ -169,12 +175,21 @@ def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block:
pd_data = pd_data.reset_index(names=index_ids)
as_pyarrow = pa.Table.from_pandas(pd_data, preserve_index=False)
array_value = core.ArrayValue.from_pyarrow(as_pyarrow, session=session)
return cls(
block = cls(
array_value,
column_labels=column_labels,
index_columns=index_ids,
index_labels=index_labels,
)
if cache_transpose:
try:
# this cache will help when aligning on axis=1
block = block.with_transpose_cache(
cls.from_local(data.T, session, cache_transpose=False)
)
except Exception:
pass
return block

@property
def index(self) -> BlockIndexProperties:
Expand Down Expand Up @@ -724,12 +739,18 @@ def with_column_labels(
f"The column labels size `{len(label_list)} ` should equal to the value"
+ f"columns size: {len(self.value_columns)}."
)
return Block(
block = Block(
self._expr,
index_columns=self.index_columns,
column_labels=label_list,
index_labels=self.index.names,
)
singleton_label = len(list(value)) == 1 and list(value)[0]
if singleton_label is not None and self._transpose_cache is not None:
new_cache, label_id = self._transpose_cache.create_constant(singleton_label)
new_cache = new_cache.set_index([label_id])
block = block.with_transpose_cache(new_cache)
return block

def with_transpose_cache(self, transposed: Block):
return Block(
Expand Down Expand Up @@ -1935,6 +1956,153 @@ def merge(
expr = joined_expr.promote_offsets(offset_index_id)
return Block(expr, index_columns=[offset_index_id], column_labels=labels)

def _align_both_axes(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
# Join rows
aligned_block, (get_column_left, get_column_right) = self.join(other, how=how)
# join columns schema
# indexers will be none for exact match
if self.column_labels.equals(other.column_labels):
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
else:
columns, lcol_indexer, rcol_indexer = self.column_labels.join(
other.column_labels, how="outer", return_indexers=True
)
lcol_indexer = (
lcol_indexer if (lcol_indexer is not None) else range(len(columns))
)
rcol_indexer = (
rcol_indexer if (rcol_indexer is not None) else range(len(columns))
)

left_input_lookup = (
lambda index: ex.free_var(get_column_left[self.value_columns[index]])
if index != -1
else ex.const(None)
)
righ_input_lookup = (
lambda index: ex.free_var(get_column_right[other.value_columns[index]])
if index != -1
else ex.const(None)
)

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return aligned_block, columns, tuple(zip(left_inputs, right_inputs))

def _align_axis_0(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
assert len(other.value_columns) == 1
aligned_block, (get_column_left, get_column_right) = self.join(other, how=how)

series_column_id = other.value_columns[0]
inputs = tuple(
(
ex.free_var(get_column_left[col]),
ex.free_var(get_column_right[series_column_id]),
)
for col in self.value_columns
)
return aligned_block, self.column_labels, inputs

def _align_series_block_axis_1(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
assert len(other.value_columns) == 1
if other._transpose_cache is None:
raise ValueError(
"Wrong align method, this approach requires transpose cache"
)

# Join rows
aligned_block, (get_column_left, get_column_right) = join_with_single_row(
self, other.transpose()
)
# join columns schema
# indexers will be none for exact match
if self.column_labels.equals(other.transpose().column_labels):
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
else:
columns, lcol_indexer, rcol_indexer = self.column_labels.join(
other.transpose().column_labels, how=how, return_indexers=True
)
lcol_indexer = (
lcol_indexer if (lcol_indexer is not None) else range(len(columns))
)
rcol_indexer = (
rcol_indexer if (rcol_indexer is not None) else range(len(columns))
)

left_input_lookup = (
lambda index: ex.free_var(get_column_left[self.value_columns[index]])
if index != -1
else ex.const(None)
)
righ_input_lookup = (
lambda index: ex.free_var(
get_column_right[other.transpose().value_columns[index]]
)
if index != -1
else ex.const(None)
)

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return aligned_block, columns, tuple(zip(left_inputs, right_inputs))

def _align_pd_series_axis_1(
self, other: pd.Series, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
if self.column_labels.equals(other.index):
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
else:
if not (self.column_labels.is_unique and other.index.is_unique):
raise ValueError("Cannot align non-unique indices")
columns, lcol_indexer, rcol_indexer = self.column_labels.join(
other.index, how=how, return_indexers=True
)
lcol_indexer = (
lcol_indexer if (lcol_indexer is not None) else range(len(columns))
)
rcol_indexer = (
rcol_indexer if (rcol_indexer is not None) else range(len(columns))
)

left_input_lookup = (
lambda index: ex.free_var(self.value_columns[index])
if index != -1
else ex.const(None)
)
righ_input_lookup = (
lambda index: ex.const(other.iloc[index]) if index != -1 else ex.const(None)
)

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return self, columns, tuple(zip(left_inputs, right_inputs))

def _apply_binop(
self,
op: ops.BinaryOp,
inputs: Sequence[Tuple[ex.Expression, ex.Expression]],
labels: pd.Index,
reverse: bool = False,
) -> Block:
block = self
binop_result_ids = []
for left_input, right_input in inputs:
expr = (
op.as_expr(right_input, left_input)
if reverse
else op.as_expr(left_input, right_input)
)
block, result_col_id = block.project_expr(expr)
binop_result_ids.append(result_col_id)

return block.select_columns(binop_result_ids).with_column_labels(labels)

def join(
self,
other: Block,
Expand Down Expand Up @@ -2256,15 +2424,6 @@ def column_ids(self) -> Sequence[str]:
"""Column(s) to use as row labels."""
return self._block._index_columns

def __repr__(self) -> str:
"""Converts an Index to a string."""
# TODO(swast): Add a timeout here? If the query is taking a long time,
# maybe we just print the job metadata that we have so far?
# TODO(swast): Avoid downloading the whole index by using job
# metadata, like we do with DataFrame.
preview = self.to_pandas()
return repr(preview)

def to_pandas(self) -> pd.Index:
"""Executes deferred operations and downloads the results."""
if len(self.column_ids) == 0:
Expand Down Expand Up @@ -2359,6 +2518,61 @@ def join_indexless(
)


def join_with_single_row(
left: Block,
single_row_block: Block,
) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]:
"""
Special join case where other is a single row block.
This property is not validated, caller responsible for not passing multi-row block.
Preserves index of the left block, ignoring label of other.
"""
left_expr = left.expr
# ignore index columns by dropping them
right_expr = single_row_block.expr.select_columns(single_row_block.value_columns)
left_mappings = [
join_defs.JoinColumnMapping(
source_table=join_defs.JoinSide.LEFT,
source_id=id,
destination_id=guid.generate_guid(),
)
for id in left_expr.column_ids
]
right_mappings = [
join_defs.JoinColumnMapping(
source_table=join_defs.JoinSide.RIGHT,
source_id=id,
destination_id=guid.generate_guid(),
)
for id in right_expr.column_ids # skip index column
]

join_def = join_defs.JoinDefinition(
conditions=(),
mappings=(*left_mappings, *right_mappings),
type="cross",
)
combined_expr = left_expr.join(
right_expr,
join_def=join_def,
)
get_column_left = join_def.get_left_mapping()
get_column_right = join_def.get_right_mapping()
# Drop original indices from each side. and used the coalesced combination generated by the join.
index_cols_post_join = [get_column_left[id] for id in left.index_columns]

block = Block(
combined_expr,
index_columns=index_cols_post_join,
column_labels=left.column_labels.append(single_row_block.column_labels),
index_labels=[left.index.name],
)
return (
block,
(get_column_left, get_column_right),
)


def join_mono_indexed(
left: Block,
right: Block,
Expand Down Expand Up @@ -2546,7 +2760,7 @@ def coalesce_columns(
) -> Tuple[core.ArrayValue, Sequence[str]]:
result_ids = []
for left_id, right_id in zip(left_ids, right_ids):
if how == "left" or how == "inner":
if how == "left" or how == "inner" or how == "cross":
result_ids.append(left_id)
expr = expr.drop_columns([right_id])
elif how == "right":
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def size(self) -> series.Series:
by_column_ids=self._by_col_ids,
dropna=self._dropna,
)
return series.Series(agg_block, name=self._value_name)
return series.Series(agg_block.with_column_labels([self._value_name]))

def skew(self, *args, **kwargs) -> series.Series:
block = block_ops.skew(self._block, [self._value_column], self._by_col_ids)
Expand Down
Loading