diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 3a6008eaa8..10adc81dde 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -90,6 +90,7 @@ def __init__( allow_large_results: bool = False, ordering_mode: Literal["strict", "partial"] = "strict", client_endpoints_override: Optional[dict] = None, + enable_polars_execution: bool = False, ): self._credentials = credentials self._project = project @@ -108,6 +109,7 @@ def __init__( client_endpoints_override = {} self._client_endpoints_override = client_endpoints_override + self._enable_polars_execution = enable_polars_execution @property def application_name(self) -> Optional[str]: @@ -379,3 +381,17 @@ def client_endpoints_override(self, value: dict): ) self._client_endpoints_override = value + + @property + def enable_polars_execution(self) -> bool: + """If True, will use polars to execute some simple query plans locally.""" + return self._enable_polars_execution + + @enable_polars_execution.setter + def enable_polars_execution(self, value: bool): + if value is True: + msg = bfe.format_message( + "Polars execution is an experimental feature, and may not be stable. Must have polars installed." + ) + warnings.warn(msg, category=bfe.PreviewWarning) + self._enable_polars_execution = value diff --git a/bigframes/core/compile/polars/compiler.py b/bigframes/core/compile/polars/compiler.py index b2f018e80a..36a2972c1a 100644 --- a/bigframes/core/compile/polars/compiler.py +++ b/bigframes/core/compile/polars/compiler.py @@ -96,6 +96,10 @@ def _( return args[0] % args[1] if isinstance(op, ops.coalesce_op.__class__): return pl.coalesce(*args) + if isinstance(op, ops.isnull_op.__class__): + return args[0].is_null() + if isinstance(op, ops.notnull_op.__class__): + return args[0].is_not_null() if isinstance(op, ops.CaseWhenOp): expr = pl.when(args[0]).then(args[1]) for pred, result in zip(args[2::2], args[3::2]): @@ -184,7 +188,7 @@ class PolarsCompiler: expr_compiler = PolarsExpressionCompiler() agg_compiler = PolarsAggregateCompiler() - def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame: + def compile(self, plan: nodes.BigFrameNode) -> pl.LazyFrame: if not polars_installed: raise ValueError( "Polars is not installed, cannot compile to polars engine." @@ -192,7 +196,7 @@ def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame: # TODO: Create standard way to configure BFET -> BFET rewrites # Polars has incomplete slice support in lazy mode - node = nodes.bottom_up(array_value.node, bigframes.core.rewrite.rewrite_slice) + node = nodes.bottom_up(plan, bigframes.core.rewrite.rewrite_slice) return self.compile_node(node) @functools.singledispatchmethod diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 7630e71eaa..df1389d85a 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -254,6 +254,7 @@ def __init__( storage_manager=self._temp_storage_manager, strictly_ordered=self._strictly_ordered, metrics=self._metrics, + enable_polars_execution=context.enable_polars_execution, ) self._loader = bigframes.session.loader.GbqDataLoader( session=self, diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 118838c059..0bca205381 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -38,7 +38,12 @@ import bigframes.dtypes import bigframes.exceptions as bfe import bigframes.features -from bigframes.session import executor, local_scan_executor, read_api_execution +from bigframes.session import ( + executor, + local_scan_executor, + read_api_execution, + semi_executor, +) import bigframes.session._io.bigquery as bq_io import bigframes.session.metrics import bigframes.session.planner @@ -123,6 +128,7 @@ def __init__( *, strictly_ordered: bool = True, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + enable_polars_execution: bool = False, ): self.bqclient = bqclient self.storage_manager = storage_manager @@ -130,14 +136,21 @@ def __init__( self.cache: ExecutionCache = ExecutionCache() self.metrics = metrics self.bqstoragereadclient = bqstoragereadclient - # Simple left-to-right precedence for now - self._semi_executors = ( + self._enable_polars_execution = enable_polars_execution + self._semi_executors: Sequence[semi_executor.SemiExecutor] = ( read_api_execution.ReadApiSemiExecutor( bqstoragereadclient=bqstoragereadclient, project=self.bqclient.project, ), local_scan_executor.LocalScanExecutor(), ) + if enable_polars_execution: + from bigframes.session import polars_executor + + self._semi_executors = ( + *self._semi_executors, + polars_executor.PolarsExecutor(), + ) def to_sql( self, @@ -542,8 +555,8 @@ def _execute_plan( """Just execute whatever plan as is, without further caching or decomposition.""" # First try to execute fast-paths if not output_spec.require_bq_table: - for semi_executor in self._semi_executors: - maybe_result = semi_executor.execute(plan, ordered=ordered, peek=peek) + for exec in self._semi_executors: + maybe_result = exec.execute(plan, ordered=ordered, peek=peek) if maybe_result: return maybe_result diff --git a/bigframes/session/polars_executor.py b/bigframes/session/polars_executor.py new file mode 100644 index 0000000000..80d59f5285 --- /dev/null +++ b/bigframes/session/polars_executor.py @@ -0,0 +1,66 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Optional, TYPE_CHECKING + +from bigframes.core import bigframe_node, nodes +from bigframes.session import executor, semi_executor + +if TYPE_CHECKING: + import polars as pl + + +_COMPATIBLE_NODES = ( + nodes.ReadLocalNode, + nodes.OrderByNode, + nodes.ReversedNode, + nodes.SelectionNode, + nodes.FilterNode, # partial support + nodes.ProjectionNode, # partial support +) + + +class PolarsExecutor(semi_executor.SemiExecutor): + def __init__(self): + # This will error out if polars is not installed + from bigframes.core.compile.polars import PolarsCompiler + + self._compiler = PolarsCompiler() + + def execute( + self, + plan: bigframe_node.BigFrameNode, + ordered: bool, + peek: Optional[int] = None, + ) -> Optional[executor.ExecuteResult]: + if not self._can_execute(plan): + return None + # Note: Ignoring ordered flag, as just executing totally ordered is fine. + try: + lazy_frame: pl.LazyFrame = self._compiler.compile(plan) + except Exception: + return None + if peek is not None: + lazy_frame = lazy_frame.limit(peek) + pa_table = lazy_frame.collect().to_arrow() + return executor.ExecuteResult( + arrow_batches=iter(pa_table.to_batches()), + schema=plan.schema, + total_bytes=pa_table.nbytes, + total_rows=pa_table.num_rows, + ) + + def _can_execute(self, plan: bigframe_node.BigFrameNode): + return all(isinstance(node, _COMPATIBLE_NODES) for node in plan.unique_nodes()) diff --git a/tests/system/small/test_polars_execution.py b/tests/system/small/test_polars_execution.py new file mode 100644 index 0000000000..e0e9a751c1 --- /dev/null +++ b/tests/system/small/test_polars_execution.py @@ -0,0 +1,74 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +import bigframes +from tests.system.utils import assert_pandas_df_equal + +polars = pytest.importorskip("polars", reason="polars is required for this test") + + +@pytest.fixture(scope="module") +def session_w_polars(): + context = bigframes.BigQueryOptions(location="US", enable_polars_execution=True) + session = bigframes.Session(context=context) + yield session + session.close() # close generated session at cleanup time + + +def test_polar_execution_sorted(session_w_polars, scalars_pandas_df_index): + execution_count_before = session_w_polars._metrics.execution_count + bf_df = session_w_polars.read_pandas(scalars_pandas_df_index) + + pd_result = scalars_pandas_df_index.sort_index(ascending=False)[ + ["int64_too", "bool_col"] + ] + bf_result = bf_df.sort_index(ascending=False)[["int64_too", "bool_col"]].to_pandas() + + assert session_w_polars._metrics.execution_count == execution_count_before + assert_pandas_df_equal(bf_result, pd_result) + + +def test_polar_execution_sorted_filtered(session_w_polars, scalars_pandas_df_index): + execution_count_before = session_w_polars._metrics.execution_count + bf_df = session_w_polars.read_pandas(scalars_pandas_df_index) + + pd_result = scalars_pandas_df_index.sort_index(ascending=False).dropna( + subset=["int64_col", "string_col"] + ) + bf_result = ( + bf_df.sort_index(ascending=False) + .dropna(subset=["int64_col", "string_col"]) + .to_pandas() + ) + + assert session_w_polars._metrics.execution_count == execution_count_before + assert_pandas_df_equal(bf_result, pd_result) + + +def test_polar_execution_unsupported_sql_fallback( + session_w_polars, scalars_pandas_df_index +): + execution_count_before = session_w_polars._metrics.execution_count + bf_df = session_w_polars.read_pandas(scalars_pandas_df_index) + + pd_df = scalars_pandas_df_index.copy() + pd_df["str_len_col"] = pd_df.string_col.str.len() + pd_result = pd_df + + bf_df["str_len_col"] = bf_df.string_col.str.len() + bf_result = bf_df.to_pandas() + + assert session_w_polars._metrics.execution_count == (execution_count_before + 1) + assert_pandas_df_equal(bf_result, pd_result) diff --git a/tests/unit/polars_session.py b/tests/unit/polars_session.py index d592b49038..61daab9a8b 100644 --- a/tests/unit/polars_session.py +++ b/tests/unit/polars_session.py @@ -46,7 +46,7 @@ def execute( """ Execute the ArrayValue, storing the result to a temporary session-owned table. """ - lazy_frame: polars.LazyFrame = self.compiler.compile(array_value) + lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node) pa_table = lazy_frame.collect().to_arrow() # Currently, pyarrow types might not quite be exactly the ones in the bigframes schema. # Nullability may be different, and might use large versions of list, string datatypes.