Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2077,14 +2077,12 @@ def merge(
result_columns = []
matching_join_labels = []

coalesced_ids = []
for left_id, right_id in zip(left_join_ids, right_join_ids):
joined_expr, coalesced_id = joined_expr.project_to_id(
ops.coalesce_op.as_expr(
get_column_left[left_id], get_column_right[right_id]
),
)
coalesced_ids.append(coalesced_id)
left_post_join_ids = tuple(get_column_left[id] for id in left_join_ids)
right_post_join_ids = tuple(get_column_right[id] for id in right_join_ids)

joined_expr, coalesced_ids = coalesce_columns(
joined_expr, left_post_join_ids, right_post_join_ids, how=how, drop=False
)

for col_id in self.value_columns:
if col_id in left_join_ids:
Expand All @@ -2102,7 +2100,6 @@ def merge(
result_columns.append(get_column_left[col_id])
for col_id in other.value_columns:
if col_id in right_join_ids:
key_part = right_join_ids.index(col_id)
if other.col_id_to_label[matching_right_id] in matching_join_labels:
pass
else:
Expand Down Expand Up @@ -2928,26 +2925,31 @@ def resolve_label_id(label: Label) -> str:
)


# TODO: Rewrite just to return expressions
def coalesce_columns(
expr: core.ArrayValue,
left_ids: typing.Sequence[str],
right_ids: typing.Sequence[str],
how: str,
drop: bool = True,
) -> Tuple[core.ArrayValue, Sequence[str]]:
result_ids = []
for left_id, right_id in zip(left_ids, right_ids):
if how == "left" or how == "inner" or how == "cross":
result_ids.append(left_id)
expr = expr.drop_columns([right_id])
if drop:
expr = expr.drop_columns([right_id])
elif how == "right":
result_ids.append(right_id)
expr = expr.drop_columns([left_id])
if drop:
expr = expr.drop_columns([left_id])
elif how == "outer":
coalesced_id = guid.generate_guid()
expr, coalesced_id = expr.project_to_id(
ops.coalesce_op.as_expr(left_id, right_id)
)
expr = expr.drop_columns([left_id, right_id])
if drop:
expr = expr.drop_columns([left_id, right_id])
result_ids.append(coalesced_id)
else:
raise ValueError(f"Unexpected join type: {how}. {constants.FEEDBACK_LINK}")
Expand Down