diff --git a/bigframes/core/bigframe_node.py b/bigframes/core/bigframe_node.py index 369e8f6329..96b76159b2 100644 --- a/bigframes/core/bigframe_node.py +++ b/bigframes/core/bigframe_node.py @@ -211,9 +211,11 @@ def explicitly_ordered(self) -> bool: @functools.cached_property def height(self) -> int: - if len(self.child_nodes) == 0: - return 0 - return max(child.height for child in self.child_nodes) + 1 + from bigframes.core import tree_properties + + return tree_properties.GLOBAL_PROPERTY_SCOPE.get_property( + self, tree_properties.HEIGHT_PROP + ) @functools.cached_property def total_variables(self) -> int: diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index 82df53af82..fd75ebb15c 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -13,13 +13,154 @@ # limitations under the License. from __future__ import annotations +import abc +import dataclasses import functools import itertools -from typing import Callable, Dict, Optional, Sequence +from typing import ( + Any, + Callable, + Dict, + Generator, + Generic, + Hashable, + List, + Optional, + Sequence, + Tuple, + TypeVar, +) +import weakref import bigframes.core.nodes as nodes +@dataclasses.dataclass(frozen=True) +class GetPropRequest: + property: Property + node: nodes.BigFrameNode + + +T = TypeVar("T", bound=Hashable) + + +@dataclasses.dataclass(frozen=True) +class Property(abc.ABC, Generic[T]): + id: str + + @abc.abstractmethod + def get(self, node: nodes.BigFrameNode) -> Generator[GetPropRequest, Any, T]: + ... + + +@dataclasses.dataclass(frozen=True) +class Height(Property[int]): + id: str = "Height" + + def get(self, node: nodes.BigFrameNode) -> Generator[GetPropRequest, int, int]: + if len(node.child_nodes) == 0: + return 0 + heights = [] + for child in node.child_nodes: + child_height = yield GetPropRequest(self, child) + heights.append(child_height) + return min(heights) + 1 + + +class PropertyScope: + _memoized: weakref.WeakKeyDictionary[ + nodes.BigFrameNode, Dict[Property, Any] + ] = weakref.WeakKeyDictionary() + + def get_property( + self, node: nodes.BigFrameNode, property_instance: Property[T] + ) -> T: + """ + Gets the value of a property for a given node, using memoization and + an iterative approach with an explicit stack to avoid deep Python recursion + in this method. + + Args: + node: The node for which to compute the property. + property_instance: The specific Property object (e.g., an instance of Height). + + Returns: + The computed value of the property for the node. + + Raises: + RecursionError: If a cycle in property dependencies is detected. + TypeError: If a property's get() method yields an unexpected value type. + Exception: Propagates exceptions raised during property computation. + """ + # Stack to manage generators that are waiting for a dependency result. + generator_stack: List[Tuple[Generator, nodes.BigFrameNode, Property]] = [] + + # Value computed by the most recent dependency, to be sent into the parent generator. + result_to_send: Any = None + + # The current node and property being processed. Start with the initial request. + current_node: nodes.BigFrameNode = node + current_prop: Property = property_instance + current_gen: Optional[Generator[Any, None, None]] = None + + while True: + if self._cache_contains(current_node, current_prop): + result_to_send = self._get_cache(current_node, current_prop) + if not generator_stack: + # No generators are waiting, this is the final result. + return result_to_send + else: + current_gen, current_node, current_prop = generator_stack.pop() + continue + elif current_gen is None: + current_gen = current_prop.get(current_node) + result_to_send = None # Send None initially, gen.send(None) = next(gen) + try: + # Send the result from a completed dependency (or None initially). + request = current_gen.send(result_to_send) + result_to_send = None # Consume the result after sending + + # Generator yielded a request for another property. + assert isinstance(request, GetPropRequest) + # Push the current generator onto the stack, as it's now waiting. + generator_stack.append((current_gen, current_node, current_prop)) + # Set the new dependency as the current task. + current_node, current_prop = request.node, request.property + current_gen = None # Clear current generator before looping + continue + + except StopIteration as e: + self._insert_cache(current_node, current_prop, e.value) + result_to_send = e.value + # Check if this completes the overall request or a dependency. + if not generator_stack: + return result_to_send # Final result + else: + current_gen, current_node, current_prop = generator_stack.pop() + continue + + def _insert_cache(self, node, property, value): + if node not in self._memoized: + self._memoized[node] = {} + self._memoized[node][property] = value + + def _cache_contains(self, node, property: Property[T]) -> bool: + node_cache = self._memoized.get(node) + if node_cache is not None and property in node_cache: + return True + return False + + def _get_cache(self, node, property: Property[T]) -> T: + node_cache = self._memoized.get(node) + if node_cache is not None and property in node_cache: + return node_cache[property] # Get cached result + raise ValueError("no value") + + +HEIGHT_PROP = Height() +GLOBAL_PROPERTY_SCOPE = PropertyScope() + + def is_trivially_executable(node: nodes.BigFrameNode) -> bool: if local_only(node): return True