From 6e7d3b1e3b04e59966754b6337e25b38dec0cccc Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 25 Mar 2024 19:45:42 +0000 Subject: [PATCH 1/3] fix: Fix case where df.peek would fail to execute even with force=True --- bigframes/core/blocks.py | 3 +- bigframes/core/nodes.py | 37 +------------------ .../core/{traversal.py => tree_properties.py} | 13 +++++++ bigframes/session/__init__.py | 5 ++- tests/system/small/test_dataframe.py | 11 ++++++ 5 files changed, 30 insertions(+), 39 deletions(-) rename bigframes/core/{traversal.py => tree_properties.py} (71%) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index afa13375b1..d030a4398a 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -41,6 +41,7 @@ import bigframes.core.guid as guid import bigframes.core.join_def as join_defs import bigframes.core.ordering as ordering +import bigframes.core.tree_properties as tree_properties import bigframes.core.utils import bigframes.core.utils as utils import bigframes.dtypes @@ -444,7 +445,7 @@ def to_pandas( return df, query_job def try_peek(self, n: int = 20) -> typing.Optional[pd.DataFrame]: - if self.expr.node.peekable: + if tree_properties.peekable(self.expr.node): iterator, _ = self.session._peek(self.expr, n) df = self._to_dataframe(iterator) self._copy_index_to_pandas(df) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 5ebd2a5997..e88b9ccdc6 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -90,11 +90,6 @@ def session(self): def _node_hash(self): return hash(tuple(hash(getattr(self, field.name)) for field in fields(self))) - @property - def peekable(self) -> bool: - """Indicates whether the node can be sampled efficiently""" - return all(child.peekable for child in self.child_nodes) - @property def roots(self) -> typing.Set[BigFrameNode]: roots = itertools.chain.from_iterable( @@ -143,12 +138,6 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: def __hash__(self): return self._node_hash - @property - def peekable(self) -> bool: - children_peekable = all(child.peekable for child in self.child_nodes) - single_root = len(self.roots) == 1 - return children_peekable and single_root - @functools.cached_property def schema(self) -> schemata.ArraySchema: def join_mapping_to_schema_item(mapping: JoinColumnMapping): @@ -204,10 +193,6 @@ class ReadLocalNode(BigFrameNode): def __hash__(self): return self._node_hash - @property - def peekable(self) -> bool: - return True - @property def roots(self) -> typing.Set[BigFrameNode]: return {self} @@ -233,10 +218,6 @@ def session(self): def __hash__(self): return self._node_hash - @property - def peekable(self) -> bool: - return True - @property def roots(self) -> typing.Set[BigFrameNode]: return {self} @@ -261,13 +242,9 @@ class PromoteOffsetsNode(UnaryNode): def __hash__(self): return self._node_hash - @property - def peekable(self) -> bool: - return False - @property def non_local(self) -> bool: - return False + return True @property def schema(self) -> schemata.ArraySchema: @@ -365,10 +342,6 @@ def row_preserving(self) -> bool: def __hash__(self): return self._node_hash - @property - def peekable(self) -> bool: - return False - @property def non_local(self) -> bool: return True @@ -401,10 +374,6 @@ class WindowOpNode(UnaryNode): def __hash__(self): return self._node_hash - @property - def peekable(self) -> bool: - return False - @property def non_local(self) -> bool: return True @@ -453,10 +422,6 @@ def row_preserving(self) -> bool: def non_local(self) -> bool: return True - @property - def peekable(self) -> bool: - return False - @functools.cached_property def schema(self) -> schemata.ArraySchema: def infer_dtype( diff --git a/bigframes/core/traversal.py b/bigframes/core/tree_properties.py similarity index 71% rename from bigframes/core/traversal.py rename to bigframes/core/tree_properties.py index b038ee6599..9fffdee80f 100644 --- a/bigframes/core/traversal.py +++ b/bigframes/core/tree_properties.py @@ -12,8 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools + import bigframes.core.nodes as nodes +# TODO: Convert these functions to iterative or enforce hard limit on tree depth. The below algorithms can cause stack to exceed limit. + def is_trivially_executable(node: nodes.BigFrameNode) -> bool: if local_only(node): @@ -25,3 +29,12 @@ def is_trivially_executable(node: nodes.BigFrameNode) -> bool: def local_only(node: nodes.BigFrameNode) -> bool: return all(isinstance(node, nodes.ReadLocalNode) for node in node.roots) + + +@functools.cache +def peekable(node: nodes.BigFrameNode) -> bool: + if local_only(node): + return True + children_peekable = all(peekable(child) for child in node.child_nodes) + self_peekable = not node.non_local + return children_peekable and self_peekable diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 479b3a7bac..54cd269ec1 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -82,7 +82,8 @@ import bigframes.core.guid as guid from bigframes.core.ordering import IntegerEncoding import bigframes.core.ordering as order -import bigframes.core.traversal as traversals +import bigframes.core.tree_properties as traversals +import bigframes.core.tree_properties as tree_properties import bigframes.core.utils as utils import bigframes.dtypes import bigframes.formatting_helpers as formatting_helpers @@ -1848,7 +1849,7 @@ def _peek( self, array_value: core.ArrayValue, n_rows: int ) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """A 'peek' efficiently accesses a small number of rows in the dataframe.""" - if not array_value.node.peekable: + if not tree_properties.peekable(array_value.node): raise NotImplementedError("cannot efficient peek this dataframe") sql = self._compile_unordered(array_value).peek_sql(n_rows) return self._start_query( diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 99ee6680fa..96665f2f46 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -494,6 +494,17 @@ def test_df_peek_force_default(scalars_dfs): assert len(peek_result) == 3 +def test_df_peek_reset_index(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + peek_result = ( + scalars_df[["int64_col", "int64_too"]].reset_index(drop=True).peek(n=3) + ) + pd.testing.assert_index_equal( + scalars_pandas_df[["int64_col", "int64_too"]].columns, peek_result.columns + ) + assert len(peek_result) == 3 + + def test_repr_w_all_rows(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs From 7c7df968eb7fa851bf3e2fdcf309336ffebe92ee Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 26 Mar 2024 01:41:27 +0000 Subject: [PATCH 2/3] remove cache from peekable property --- bigframes/core/tree_properties.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index 9fffdee80f..bc29f115f6 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import functools import bigframes.core.nodes as nodes @@ -31,7 +30,6 @@ def local_only(node: nodes.BigFrameNode) -> bool: return all(isinstance(node, nodes.ReadLocalNode) for node in node.roots) -@functools.cache def peekable(node: nodes.BigFrameNode) -> bool: if local_only(node): return True From 6b93af613f1b7c345e58df74d1dd73c42f7a57db Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 27 Mar 2024 00:20:01 +0000 Subject: [PATCH 3/3] if force=True always peek after caching even if peeking inefficient --- bigframes/core/blocks.py | 6 ++++-- bigframes/dataframe.py | 2 +- bigframes/session/__init__.py | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index d030a4398a..11899eef11 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -444,8 +444,10 @@ def to_pandas( df.set_axis(self.column_labels, axis=1, copy=False) return df, query_job - def try_peek(self, n: int = 20) -> typing.Optional[pd.DataFrame]: - if tree_properties.peekable(self.expr.node): + def try_peek( + self, n: int = 20, force: bool = False + ) -> typing.Optional[pd.DataFrame]: + if force or tree_properties.peekable(self.expr.node): iterator, _ = self.session._peek(self.expr, n) df = self._to_dataframe(iterator) self._copy_index_to_pandas(df) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 7e82ba125c..f22e9c7867 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1120,7 +1120,7 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame: if maybe_result is None: if force: self._cached() - maybe_result = self._block.try_peek(n) + maybe_result = self._block.try_peek(n, force=True) assert maybe_result is not None else: raise ValueError( diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 54cd269ec1..ddb2646ce6 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1850,7 +1850,7 @@ def _peek( ) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """A 'peek' efficiently accesses a small number of rows in the dataframe.""" if not tree_properties.peekable(array_value.node): - raise NotImplementedError("cannot efficient peek this dataframe") + warnings.warn("Peeking this value cannot be done efficiently.") sql = self._compile_unordered(array_value).peek_sql(n_rows) return self._start_query( sql=sql,