Skip to content

Commit b60feb4

Browse files
authored
Merge branch 'main' into release-please--branches--main
2 parents 9522bc8 + 3076a13 commit b60feb4

File tree

16 files changed

+192
-110
lines changed

16 files changed

+192
-110
lines changed

bigframes/core/__init__.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def _compiled_schema(self) -> schemata.ArraySchema:
177177
def as_cached(
178178
self: ArrayValue,
179179
cache_table: google.cloud.bigquery.Table,
180-
ordering: orderings.ExpressionOrdering,
180+
ordering: Optional[orderings.ExpressionOrdering],
181181
) -> ArrayValue:
182182
"""
183183
Replace the node with an equivalent one that references a tabel where the value has been materialized to.
@@ -234,6 +234,8 @@ def promote_offsets(self, col_id: str) -> ArrayValue:
234234
"""
235235
Convenience function to promote copy of column offsets to a value column. Can be used to reset index.
236236
"""
237+
if not self.session._strictly_ordered:
238+
raise ValueError("Generating offsets not supported in unordered mode")
237239
return ArrayValue(nodes.PromoteOffsetsNode(child=self.node, col_id=col_id))
238240

239241
def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
@@ -382,6 +384,10 @@ def project_window_op(
382384
never_skip_nulls: will disable null skipping for operators that would otherwise do so
383385
skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection
384386
"""
387+
if not self.session._strictly_ordered:
388+
# TODO: Support unbounded windows with aggregate ops and some row-order-independent analytic ops
389+
# TODO: Support non-deterministic windowing
390+
raise ValueError("Windowed ops not supported in unordered mode")
385391
return ArrayValue(
386392
nodes.WindowOpNode(
387393
child=self.node,
@@ -433,8 +439,9 @@ def unpivot(
433439
"""
434440
# There will be N labels, used to disambiguate which of N source columns produced each output row
435441
explode_offsets_id = bigframes.core.guid.generate_guid("unpivot_offsets_")
436-
labels_array = self._create_unpivot_labels_array(row_labels, index_col_ids)
437-
labels_array = labels_array.promote_offsets(explode_offsets_id)
442+
labels_array = self._create_unpivot_labels_array(
443+
row_labels, index_col_ids, explode_offsets_id
444+
)
438445

439446
# Unpivot creates N output rows for each input row, labels disambiguate these N rows
440447
joined_array = self._cross_join_w_labels(labels_array, join_side)
@@ -500,6 +507,7 @@ def _create_unpivot_labels_array(
500507
self,
501508
former_column_labels: typing.Sequence[typing.Hashable],
502509
col_ids: typing.Sequence[str],
510+
offsets_id: str,
503511
) -> ArrayValue:
504512
"""Create an ArrayValue from a list of label tuples."""
505513
rows = []
@@ -510,6 +518,7 @@ def _create_unpivot_labels_array(
510518
col_ids[i]: (row_label[i] if pandas.notnull(row_label[i]) else None)
511519
for i in range(len(col_ids))
512520
}
521+
row[offsets_id] = row_offset
513522
rows.append(row)
514523

515524
return ArrayValue.from_pyarrow(pa.Table.from_pylist(rows), session=self.session)

bigframes/core/blocks.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ def _materialize_local(
553553
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
554554
# TODO(swast): Allow for dry run and timeout.
555555
_, query_job = self.session._query_to_destination(
556-
self.session._to_sql(self.expr, sorted=True),
556+
self.session._to_sql(self.expr, sorted=materialize_options.ordered),
557557
list(self.index_columns),
558558
api_name="cached",
559559
do_clustering=False,
@@ -1016,7 +1016,7 @@ def aggregate_all_and_stack(
10161016
index_columns=[index_id],
10171017
column_labels=self.column_labels,
10181018
index_labels=[None],
1019-
).transpose(original_row_index=pd.Index([None]))
1019+
).transpose(original_row_index=pd.Index([None]), single_row_mode=True)
10201020
else: # axis_n == 1
10211021
# using offsets as identity to group on.
10221022
# TODO: Allow to promote identity/total_order columns instead for better perf
@@ -1659,6 +1659,8 @@ def melt(
16591659
value_vars=typing.Sequence[str],
16601660
var_names=typing.Sequence[typing.Hashable],
16611661
value_name: typing.Hashable = "value",
1662+
*,
1663+
create_offsets_index: bool = True,
16621664
):
16631665
"""
16641666
Unpivot columns to produce longer, narrower dataframe.
@@ -1679,20 +1681,31 @@ def melt(
16791681
index_col_ids=var_col_ids,
16801682
join_side="right",
16811683
)
1682-
index_id = guid.generate_guid()
1683-
unpivot_expr = unpivot_expr.promote_offsets(index_id)
1684+
1685+
if create_offsets_index:
1686+
index_id = guid.generate_guid()
1687+
unpivot_expr = unpivot_expr.promote_offsets(index_id)
1688+
index_cols = [index_id]
1689+
else:
1690+
index_cols = []
1691+
16841692
# Need to reorder to get id_vars before var_col and unpivot_col
16851693
unpivot_expr = unpivot_expr.select_columns(
1686-
[index_id, *id_vars, *var_col_ids, unpivot_col_id]
1694+
[*index_cols, *id_vars, *var_col_ids, unpivot_col_id]
16871695
)
16881696

16891697
return Block(
16901698
unpivot_expr,
16911699
column_labels=[*id_labels, *var_names, value_name],
1692-
index_columns=[index_id],
1700+
index_columns=index_cols,
16931701
)
16941702

1695-
def transpose(self, *, original_row_index: Optional[pd.Index] = None) -> Block:
1703+
def transpose(
1704+
self,
1705+
*,
1706+
original_row_index: Optional[pd.Index] = None,
1707+
single_row_mode: bool = False,
1708+
) -> Block:
16961709
"""Transpose the block. Will fail if dtypes aren't coercible to a common type or too many rows.
16971710
Can provide the original_row_index directly if it is already known, otherwise a query is needed.
16981711
"""
@@ -1718,7 +1731,11 @@ def transpose(self, *, original_row_index: Optional[pd.Index] = None) -> Block:
17181731
block.column_labels, pd.Index(range(len(block.column_labels)))
17191732
)
17201733
)
1721-
numbered_block, offsets = numbered_block.promote_offsets()
1734+
# TODO: Determine if single row from expression tree (after aggregation without groupby)
1735+
if single_row_mode:
1736+
numbered_block, offsets = numbered_block.create_constant(0)
1737+
else:
1738+
numbered_block, offsets = numbered_block.promote_offsets()
17221739

17231740
stacked_block = numbered_block.melt(
17241741
id_vars=(offsets,),
@@ -1727,6 +1744,7 @@ def transpose(self, *, original_row_index: Optional[pd.Index] = None) -> Block:
17271744
"col_offset",
17281745
),
17291746
value_vars=block.value_columns,
1747+
create_offsets_index=False,
17301748
)
17311749
col_labels = stacked_block.value_columns[-2 - original_col_index.nlevels : -2]
17321750
col_offset = stacked_block.value_columns[-2] # disambiguator we created earlier

bigframes/core/compile/compiler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ def compile_cached_table(node: nodes.CachedTableNode, ordered: bool = True):
109109
)
110110
ibis_table = ibis.table(physical_schema, full_table_name)
111111
if ordered:
112+
if node.ordering is None:
113+
# If this happens, session malfunctioned while applying cached results.
114+
raise ValueError(
115+
"Cannot use unordered cached value. Result requires ordering information."
116+
)
112117
return compiled.OrderedIR(
113118
ibis_table,
114119
columns=tuple(

bigframes/core/indexes/base.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,12 @@ def __new__(
9090
# TODO: Support more index subtypes
9191
from bigframes.core.indexes.multi import MultiIndex
9292

93-
klass = MultiIndex if len(block._index_columns) > 1 else cls
94-
# TODO(b/340893286): fix type error
95-
result = typing.cast(Index, object.__new__(klass)) # type: ignore
93+
if len(block._index_columns) <= 1:
94+
klass = cls
95+
else:
96+
klass = MultiIndex
97+
98+
result = typing.cast(Index, object.__new__(klass))
9699
result._query_job = None
97100
result._block = block
98101
block.session._register_object(result)
@@ -161,7 +164,8 @@ def dtype(self):
161164
@property
162165
def dtypes(self) -> pandas.Series:
163166
return pandas.Series(
164-
data=self._block.index.dtypes, index=self._block.index.names # type:ignore
167+
data=self._block.index.dtypes,
168+
index=typing.cast(typing.Tuple, self._block.index.names),
165169
)
166170

167171
@property
@@ -408,10 +412,10 @@ def drop(
408412
block = block.drop_columns([condition_id])
409413
return Index(block)
410414

411-
def dropna(self, how: str = "any") -> Index:
415+
def dropna(self, how: typing.Literal["all", "any"] = "any") -> Index:
412416
if how not in ("any", "all"):
413417
raise ValueError("'how' must be one of 'any', 'all'")
414-
result = block_ops.dropna(self._block, self._block.index_columns, how=how) # type: ignore
418+
result = block_ops.dropna(self._block, self._block.index_columns, how=how)
415419
return Index(result)
416420

417421
def drop_duplicates(self, *, keep: str = "first") -> Index:

bigframes/core/nodes.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ def transform_children(
411411
return self
412412

413413

414+
# This node shouldn't be used in the "original" expression tree, only used as replacement for original during planning
414415
@dataclass(frozen=True)
415416
class CachedTableNode(BigFrameNode):
416417
# The original BFET subtree that was cached
@@ -422,7 +423,7 @@ class CachedTableNode(BigFrameNode):
422423
table_id: str = field()
423424
physical_schema: Tuple[bq.SchemaField, ...] = field()
424425

425-
ordering: orderings.ExpressionOrdering = field()
426+
ordering: typing.Optional[orderings.ExpressionOrdering] = field()
426427

427428
@property
428429
def session(self):
@@ -446,6 +447,8 @@ def variables_introduced(self) -> int:
446447
@property
447448
def hidden_columns(self) -> typing.Tuple[str, ...]:
448449
"""Physical columns used to define ordering but not directly exposed as value columns."""
450+
if self.ordering is None:
451+
return ()
449452
return tuple(
450453
col
451454
for col in sorted(self.ordering.referenced_columns)

bigframes/ml/metrics/_metrics.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ def recall_score(
227227
y_true: Union[bpd.DataFrame, bpd.Series],
228228
y_pred: Union[bpd.DataFrame, bpd.Series],
229229
*,
230-
average: str = "binary",
230+
average: typing.Optional[str] = "binary",
231231
) -> pd.Series:
232232
# TODO(ashleyxu): support more average type, default to "binary"
233233
if average is not None:
@@ -264,7 +264,7 @@ def precision_score(
264264
y_true: Union[bpd.DataFrame, bpd.Series],
265265
y_pred: Union[bpd.DataFrame, bpd.Series],
266266
*,
267-
average: str = "binary",
267+
average: typing.Optional[str] = "binary",
268268
) -> pd.Series:
269269
# TODO(ashleyxu): support more average type, default to "binary"
270270
if average is not None:
@@ -303,7 +303,7 @@ def f1_score(
303303
y_true: Union[bpd.DataFrame, bpd.Series],
304304
y_pred: Union[bpd.DataFrame, bpd.Series],
305305
*,
306-
average: str = "binary",
306+
average: typing.Optional[str] = "binary",
307307
) -> pd.Series:
308308
# TODO(ashleyxu): support more average type, default to "binary"
309309
y_true_series, y_pred_series = utils.convert_to_series(y_true, y_pred)

bigframes/pandas/__init__.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,32 @@ def read_gbq_function(function_name: str):
699699
read_gbq_function.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_function)
700700

701701

702+
@typing.overload
703+
def to_datetime(
704+
arg: vendored_pandas_datetimes.local_scalars,
705+
*,
706+
utc: bool = False,
707+
format: Optional[str] = None,
708+
unit: Optional[str] = None,
709+
) -> Union[pandas.Timestamp, datetime]:
710+
...
711+
712+
713+
@typing.overload
714+
def to_datetime(
715+
arg: Union[
716+
vendored_pandas_datetimes.local_iterables,
717+
bigframes.series.Series,
718+
bigframes.dataframe.DataFrame,
719+
],
720+
*,
721+
utc: bool = False,
722+
format: Optional[str] = None,
723+
unit: Optional[str] = None,
724+
) -> bigframes.series.Series:
725+
...
726+
727+
702728
def to_datetime(
703729
arg: Union[
704730
vendored_pandas_datetimes.local_scalars,

bigframes/session/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,9 @@ def __init__(
294294
self._bytes_processed_sum = 0
295295
self._slot_millis_sum = 0
296296
self._execution_count = 0
297+
# Whether this session treats objects as totally ordered.
298+
# Will expose as feature later, only False for internal testing
299+
self._strictly_ordered = True
297300

298301
@property
299302
def bqclient(self):
@@ -1841,24 +1844,31 @@ def _cache_with_cluster_cols(
18411844
"""Executes the query and uses the resulting table to rewrite future executions."""
18421845
# TODO: Use this for all executions? Problem is that caching materializes extra
18431846
# ordering columns
1847+
# TODO: May want to support some partial ordering info even for non-strict ordering mode
1848+
keep_order_info = self._strictly_ordered
1849+
18441850
compiled_value = self._compile_ordered(array_value)
18451851

18461852
ibis_expr = compiled_value._to_ibis_expr(
1847-
ordering_mode="unordered", expose_hidden_cols=True
1853+
ordering_mode="unordered", expose_hidden_cols=keep_order_info
18481854
)
18491855
tmp_table = self._ibis_to_temp_table(
18501856
ibis_expr, cluster_cols=cluster_cols, api_name="cached"
18511857
)
18521858
cached_replacement = array_value.as_cached(
18531859
cache_table=self.bqclient.get_table(tmp_table),
1854-
ordering=compiled_value._ordering,
1860+
ordering=compiled_value._ordering if keep_order_info else None,
18551861
).node
18561862
self._cached_executions[array_value.node] = cached_replacement
18571863

18581864
def _cache_with_offsets(self, array_value: core.ArrayValue):
18591865
"""Executes the query and uses the resulting table to rewrite future executions."""
18601866
# TODO: Use this for all executions? Problem is that caching materializes extra
18611867
# ordering columns
1868+
if not self._strictly_ordered:
1869+
raise ValueError(
1870+
"Caching with offsets only supported in strictly ordered mode."
1871+
)
18621872
compiled_value = self._compile_ordered(array_value)
18631873

18641874
ibis_expr = compiled_value._to_ibis_expr(

tests/system/conftest.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,17 @@ def session() -> Generator[bigframes.Session, None, None]:
139139
session.close() # close generated session at cleanup time
140140

141141

142+
@pytest.fixture(scope="session")
143+
def unordered_session() -> Generator[bigframes.Session, None, None]:
144+
context = bigframes.BigQueryOptions(
145+
location="US",
146+
)
147+
session = bigframes.Session(context=context)
148+
session._strictly_ordered = False
149+
yield session
150+
session.close() # close generated session at cleanup type
151+
152+
142153
@pytest.fixture(scope="session")
143154
def session_tokyo(tokyo_location: str) -> Generator[bigframes.Session, None, None]:
144155
context = bigframes.BigQueryOptions(

0 commit comments

Comments
 (0)