Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,18 +304,25 @@ def assign(self, source_id: str, destination_id: str) -> ArrayValue:
if destination_id in self.column_ids: # Mutate case
exprs = [
(
ex.deref(source_id if (col_id == destination_id) else col_id),
ids.ColumnId(col_id),
bigframes.core.nodes.AliasedRef(
ex.deref(source_id if (col_id == destination_id) else col_id),
ids.ColumnId(col_id),
)
)
for col_id in self.column_ids
]
else: # append case
self_projection = (
(ex.deref(col_id), ids.ColumnId(col_id)) for col_id in self.column_ids
bigframes.core.nodes.AliasedRef.identity(ids.ColumnId(col_id))
for col_id in self.column_ids
)
exprs = [
*self_projection,
(ex.deref(source_id), ids.ColumnId(destination_id)),
(
bigframes.core.nodes.AliasedRef(
ex.deref(source_id), ids.ColumnId(destination_id)
)
),
]
return ArrayValue(
nodes.SelectionNode(
Expand All @@ -337,7 +344,10 @@ def create_constant(

def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
# This basically just drops and reorders columns - logically a no-op except as a final step
selections = ((ex.deref(col_id), ids.ColumnId(col_id)) for col_id in column_ids)
selections = (
bigframes.core.nodes.AliasedRef.identity(ids.ColumnId(col_id))
for col_id in column_ids
)
return ArrayValue(
nodes.SelectionNode(
child=self.node,
Expand Down Expand Up @@ -488,7 +498,9 @@ def prepare_join_names(
nodes.SelectionNode(
other.node,
tuple(
(ex.deref(old_id), ids.ColumnId(new_id))
bigframes.core.nodes.AliasedRef(
ex.deref(old_id), ids.ColumnId(new_id)
)
for old_id, new_id in r_mapping.items()
),
),
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def _to_ibis_expr(
# Special case for empty tables, since we can't create an empty
# projection.
if not self._columns:
return bigframes_vendored.ibis.memtable([])
return self._table.select([bigframes_vendored.ibis.literal(1)])

table = self._table.select(self._columns)
if fraction is not None:
Expand Down
22 changes: 15 additions & 7 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ def compile_sql(
if ordered:
node, limit = rewrites.pullup_limit_from_slice(node)
node = nodes.bottom_up(node, rewrites.rewrite_slice)
# TODO: Extract out CTEs
node, ordering = rewrites.pull_up_order(
node, order_root=True, ordered_joins=self.strict
)
node = rewrites.column_pruning(node)
ir = self.compile_node(node)
return ir.to_sql(
order_by=ordering.all_ordering_columns,
Expand All @@ -76,6 +78,7 @@ def compile_sql(
node, _ = rewrites.pull_up_order(
node, order_root=False, ordered_joins=self.strict
)
node = rewrites.column_pruning(node)
ir = self.compile_node(node)
return ir.to_sql(selections=output_ids)

Expand All @@ -86,6 +89,7 @@ def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str:
node, _ = rewrites.pull_up_order(
node, order_root=False, ordered_joins=self.strict
)
node = rewrites.column_pruning(node)
return self.compile_node(node).to_sql(limit=n_rows, selections=ids)

def compile_raw(
Expand All @@ -97,6 +101,7 @@ def compile_raw(
node = nodes.bottom_up(node, rewrites.rewrite_slice)
node = nodes.top_down(node, rewrites.rewrite_timedelta_ops)
node, ordering = rewrites.pull_up_order(node, ordered_joins=self.strict)
node = rewrites.column_pruning(node)
ir = self.compile_node(node)
sql = ir.to_sql()
return sql, node.schema.to_bigquery(), ordering
Expand Down Expand Up @@ -192,31 +197,34 @@ def compile_readtable(self, node: nodes.ReadTableNode):
return self.compile_read_table_unordered(node.source, node.scan_list)

def read_table_as_unordered_ibis(
self, source: nodes.BigqueryDataSource
self,
source: nodes.BigqueryDataSource,
scan_cols: typing.Sequence[str],
) -> ibis_types.Table:
full_table_name = f"{source.table.project_id}.{source.table.dataset_id}.{source.table.table_id}"
used_columns = tuple(col.name for col in source.table.physical_schema)
# Physical schema might include unused columns, unsupported datatypes like JSON
physical_schema = ibis_bigquery.BigQuerySchema.to_ibis(
list(i for i in source.table.physical_schema if i.name in used_columns)
list(source.table.physical_schema)
)
if source.at_time is not None or source.sql_predicate is not None:
import bigframes.session._io.bigquery

sql = bigframes.session._io.bigquery.to_query(
full_table_name,
columns=used_columns,
columns=scan_cols,
sql_predicate=source.sql_predicate,
time_travel_timestamp=source.at_time,
)
return ibis_bigquery.Backend().sql(schema=physical_schema, query=sql)
else:
return ibis_api.table(physical_schema, full_table_name)
return ibis_api.table(physical_schema, full_table_name).select(scan_cols)

def compile_read_table_unordered(
self, source: nodes.BigqueryDataSource, scan: nodes.ScanList
):
ibis_table = self.read_table_as_unordered_ibis(source)
ibis_table = self.read_table_as_unordered_ibis(
source, scan_cols=[col.source_id for col in scan.items]
)
return compiled.UnorderedIR(
ibis_table,
tuple(
Expand Down Expand Up @@ -291,7 +299,7 @@ def set_output_names(
return nodes.SelectionNode(
node,
tuple(
(ex.DerefOp(old_id), ids.ColumnId(out_id))
bigframes.core.nodes.AliasedRef(ex.DerefOp(old_id), ids.ColumnId(out_id))
for old_id, out_id in zip(node.ids, output_ids)
),
)
Loading