Skip to content

feat: Add experimental polars execution #1747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions bigframes/_config/bigquery_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(
allow_large_results: bool = False,
ordering_mode: Literal["strict", "partial"] = "strict",
client_endpoints_override: Optional[dict] = None,
enable_polars_execution: bool = False,
):
self._credentials = credentials
self._project = project
Expand All @@ -108,6 +109,7 @@ def __init__(
client_endpoints_override = {}

self._client_endpoints_override = client_endpoints_override
self._enable_polars_execution = enable_polars_execution

@property
def application_name(self) -> Optional[str]:
Expand Down Expand Up @@ -379,3 +381,17 @@ def client_endpoints_override(self, value: dict):
)

self._client_endpoints_override = value

@property
def enable_polars_execution(self) -> bool:
"""If True, will use polars to execute some simple query plans locally."""
return self._enable_polars_execution

@enable_polars_execution.setter
def enable_polars_execution(self, value: bool):
if value is True:
msg = bfe.format_message(
"Polars execution is an experimental feature, and may not be stable. Must have polars installed."
)
warnings.warn(msg, category=bfe.PreviewWarning)
self._enable_polars_execution = value
8 changes: 6 additions & 2 deletions bigframes/core/compile/polars/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ def _(
return args[0] % args[1]
if isinstance(op, ops.coalesce_op.__class__):
return pl.coalesce(*args)
if isinstance(op, ops.isnull_op.__class__):
return args[0].is_null()
if isinstance(op, ops.notnull_op.__class__):
return args[0].is_not_null()
if isinstance(op, ops.CaseWhenOp):
expr = pl.when(args[0]).then(args[1])
for pred, result in zip(args[2::2], args[3::2]):
Expand Down Expand Up @@ -184,15 +188,15 @@ class PolarsCompiler:
expr_compiler = PolarsExpressionCompiler()
agg_compiler = PolarsAggregateCompiler()

def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame:
def compile(self, plan: nodes.BigFrameNode) -> pl.LazyFrame:
if not polars_installed:
raise ValueError(
"Polars is not installed, cannot compile to polars engine."
)

# TODO: Create standard way to configure BFET -> BFET rewrites
# Polars has incomplete slice support in lazy mode
node = nodes.bottom_up(array_value.node, bigframes.core.rewrite.rewrite_slice)
node = nodes.bottom_up(plan, bigframes.core.rewrite.rewrite_slice)
return self.compile_node(node)

@functools.singledispatchmethod
Expand Down
1 change: 1 addition & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def __init__(
storage_manager=self._temp_storage_manager,
strictly_ordered=self._strictly_ordered,
metrics=self._metrics,
enable_polars_execution=context.enable_polars_execution,
)
self._loader = bigframes.session.loader.GbqDataLoader(
session=self,
Expand Down
23 changes: 18 additions & 5 deletions bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@
import bigframes.dtypes
import bigframes.exceptions as bfe
import bigframes.features
from bigframes.session import executor, local_scan_executor, read_api_execution
from bigframes.session import (
executor,
local_scan_executor,
read_api_execution,
semi_executor,
)
import bigframes.session._io.bigquery as bq_io
import bigframes.session.metrics
import bigframes.session.planner
Expand Down Expand Up @@ -123,21 +128,29 @@ def __init__(
*,
strictly_ordered: bool = True,
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
enable_polars_execution: bool = False,
):
self.bqclient = bqclient
self.storage_manager = storage_manager
self.strictly_ordered: bool = strictly_ordered
self.cache: ExecutionCache = ExecutionCache()
self.metrics = metrics
self.bqstoragereadclient = bqstoragereadclient
# Simple left-to-right precedence for now
self._semi_executors = (
self._enable_polars_execution = enable_polars_execution
self._semi_executors: Sequence[semi_executor.SemiExecutor] = (
read_api_execution.ReadApiSemiExecutor(
bqstoragereadclient=bqstoragereadclient,
project=self.bqclient.project,
),
local_scan_executor.LocalScanExecutor(),
)
if enable_polars_execution:
from bigframes.session import polars_executor

self._semi_executors = (
*self._semi_executors,
polars_executor.PolarsExecutor(),
)

def to_sql(
self,
Expand Down Expand Up @@ -542,8 +555,8 @@ def _execute_plan(
"""Just execute whatever plan as is, without further caching or decomposition."""
# First try to execute fast-paths
if not output_spec.require_bq_table:
for semi_executor in self._semi_executors:
maybe_result = semi_executor.execute(plan, ordered=ordered, peek=peek)
for exec in self._semi_executors:
maybe_result = exec.execute(plan, ordered=ordered, peek=peek)
if maybe_result:
return maybe_result

Expand Down
66 changes: 66 additions & 0 deletions bigframes/session/polars_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

from typing import Optional, TYPE_CHECKING

from bigframes.core import bigframe_node, nodes
from bigframes.session import executor, semi_executor

if TYPE_CHECKING:
import polars as pl


_COMPATIBLE_NODES = (
nodes.ReadLocalNode,
nodes.OrderByNode,
nodes.ReversedNode,
nodes.SelectionNode,
nodes.FilterNode, # partial support
nodes.ProjectionNode, # partial support
)


class PolarsExecutor(semi_executor.SemiExecutor):
def __init__(self):
# This will error out if polars is not installed
from bigframes.core.compile.polars import PolarsCompiler

self._compiler = PolarsCompiler()

def execute(
self,
plan: bigframe_node.BigFrameNode,
ordered: bool,
peek: Optional[int] = None,
) -> Optional[executor.ExecuteResult]:
if not self._can_execute(plan):
return None
# Note: Ignoring ordered flag, as just executing totally ordered is fine.
try:
lazy_frame: pl.LazyFrame = self._compiler.compile(plan)
except Exception:
return None
if peek is not None:
lazy_frame = lazy_frame.limit(peek)
pa_table = lazy_frame.collect().to_arrow()
return executor.ExecuteResult(
arrow_batches=iter(pa_table.to_batches()),
schema=plan.schema,
total_bytes=pa_table.nbytes,
total_rows=pa_table.num_rows,
)

def _can_execute(self, plan: bigframe_node.BigFrameNode):
return all(isinstance(node, _COMPATIBLE_NODES) for node in plan.unique_nodes())
74 changes: 74 additions & 0 deletions tests/system/small/test_polars_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest

import bigframes
from tests.system.utils import assert_pandas_df_equal

polars = pytest.importorskip("polars", reason="polars is required for this test")


@pytest.fixture(scope="module")
def session_w_polars():
context = bigframes.BigQueryOptions(location="US", enable_polars_execution=True)
session = bigframes.Session(context=context)
yield session
session.close() # close generated session at cleanup time


def test_polar_execution_sorted(session_w_polars, scalars_pandas_df_index):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_result = scalars_pandas_df_index.sort_index(ascending=False)[
["int64_too", "bool_col"]
]
bf_result = bf_df.sort_index(ascending=False)[["int64_too", "bool_col"]].to_pandas()

assert session_w_polars._metrics.execution_count == execution_count_before
assert_pandas_df_equal(bf_result, pd_result)


def test_polar_execution_sorted_filtered(session_w_polars, scalars_pandas_df_index):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_result = scalars_pandas_df_index.sort_index(ascending=False).dropna(
subset=["int64_col", "string_col"]
)
bf_result = (
bf_df.sort_index(ascending=False)
.dropna(subset=["int64_col", "string_col"])
.to_pandas()
)

assert session_w_polars._metrics.execution_count == execution_count_before
assert_pandas_df_equal(bf_result, pd_result)


def test_polar_execution_unsupported_sql_fallback(
session_w_polars, scalars_pandas_df_index
):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_df = scalars_pandas_df_index.copy()
pd_df["str_len_col"] = pd_df.string_col.str.len()
pd_result = pd_df

bf_df["str_len_col"] = bf_df.string_col.str.len()
bf_result = bf_df.to_pandas()

assert session_w_polars._metrics.execution_count == (execution_count_before + 1)
assert_pandas_df_equal(bf_result, pd_result)
2 changes: 1 addition & 1 deletion tests/unit/polars_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def execute(
"""
Execute the ArrayValue, storing the result to a temporary session-owned table.
"""
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value)
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node)
pa_table = lazy_frame.collect().to_arrow()
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
# Nullability may be different, and might use large versions of list, string datatypes.
Expand Down