Skip to content

feat: add DataFrame.eval, DataFrame.query #361

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
fb7eb1d
feat: add DataFrame.eval, DataFrame.query
TrevorBergeron Jan 18, 2024
7e5d266
address pr comments
TrevorBergeron Jan 30, 2024
6069ced
Merge branch 'main' into eval2
TrevorBergeron Jan 30, 2024
78dad7c
Merge remote-tracking branch 'github/main' into eval2
TrevorBergeron Mar 6, 2024
4d85e0e
add docstring, disable new tests for legacy pandas
TrevorBergeron Mar 6, 2024
8d708a9
Merge remote-tracking branch 'github/main' into eval2
TrevorBergeron Mar 21, 2024
2b0d902
vendor the pandas eval implementation
TrevorBergeron Mar 21, 2024
8cec454
Merge remote-tracking branch 'github/main' into eval2
TrevorBergeron Mar 21, 2024
7b3a4ca
Merge remote-tracking branch 'github/main' into eval2
TrevorBergeron Mar 21, 2024
fc4b26a
amend eval docstring
TrevorBergeron Mar 21, 2024
59888b4
fix doctest expectation
TrevorBergeron Mar 21, 2024
79fe94f
Merge remote-tracking branch 'github/main' into eval2
TrevorBergeron Mar 21, 2024
ca5b670
Merge remote-tracking branch 'github/main' into eval2
TrevorBergeron Mar 22, 2024
838ff14
amend doctest
TrevorBergeron Mar 22, 2024
b9de8f9
Merge remote-tracking branch 'github/main' into eval2
TrevorBergeron Mar 22, 2024
b143097
pr comments
TrevorBergeron Mar 23, 2024
89f6abb
Merge remote-tracking branch 'github/main' into eval2
TrevorBergeron Mar 23, 2024
6887fc7
Fix doctest for eval
TrevorBergeron Mar 23, 2024
29f0fd2
Merge remote-tracking branch 'github/main' into eval2
TrevorBergeron Mar 23, 2024
f8dcce1
Merge branch 'main' into eval2
TrevorBergeron Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions bigframes/core/eval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import dataclasses
from typing import Optional

import bigframes_vendored.pandas.core.computation.eval as vendored_pandas_eval
import bigframes_vendored.pandas.core.computation.parsing as vendored_pandas_eval_parsing

import bigframes.dataframe as dataframe
import bigframes.dtypes
import bigframes.series as series


def eval(df: dataframe.DataFrame, expr: str, target: Optional[dataframe.DataFrame]):
"""
Evaluate the given python expression

Args:
df (DataFrame):
Columns of this dataframe will be used to resolve variables in expression.
expr (str):
One or more python expression to evaluate.
target (DataFrame or None):
The evaluation result will be written to the target if provided.

Returns:
Result of evaluation.
"""
index_resolver = {
vendored_pandas_eval_parsing.clean_column_name(str(name)): EvalSeries(
df.index.get_level_values(level).to_series()
)
for level, name in enumerate(df.index.names)
}
column_resolver = {
vendored_pandas_eval_parsing.clean_column_name(str(name)): EvalSeries(series)
for name, series in df.items()
}
# 3 Levels: user -> logging wrapper -> dataframe -> eval helper (this)
return vendored_pandas_eval.eval(
expr=expr, level=3, target=target, resolvers=(index_resolver, column_resolver) # type: ignore
)


@dataclasses.dataclass
class FakeNumpyArray:
dtype: bigframes.dtypes.Dtype


class EvalSeries(series.Series):
"""Slight modified series that works better with pandas.eval"""

def __init__(self, underlying: series.Series):
super().__init__(data=underlying._block)

@property
def values(self):
"""Returns fake numpy array with only dtype property so that eval can determine schema without actually downloading the data."""
return FakeNumpyArray(self.dtype)
11 changes: 11 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,17 @@ def sort_values(
)
return DataFrame(self._block.order_by(ordering))

def eval(self, expr: str) -> DataFrame:
import bigframes.core.eval as bf_eval

return bf_eval.eval(self, expr, target=self)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we only set target if inplace=True?

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.eval.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eval uses .copy() on the target if inplace=False


def query(self, expr: str) -> DataFrame:
import bigframes.core.eval as bf_eval

eval_result = bf_eval.eval(self, expr, target=None)
return self[eval_result]

def value_counts(
self,
subset: typing.Union[blocks.Label, typing.Sequence[blocks.Label]] = None,
Expand Down
38 changes: 38 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3822,6 +3822,44 @@ def test_df_to_orc(scalars_df_index, scalars_pandas_df_index):
assert bf_result == pd_result


@skip_legacy_pandas
@pytest.mark.parametrize(
("expr",),
[
("new_col = int64_col + int64_too",),
("new_col = (rowindex > 3) | bool_col",),
("int64_too = bool_col\nnew_col2 = rowindex",),
],
)
def test_df_eval(scalars_dfs, expr):
scalars_df, scalars_pandas_df = scalars_dfs

bf_result = scalars_df.eval(expr).to_pandas()
pd_result = scalars_pandas_df.eval(expr)

pd.testing.assert_frame_equal(bf_result, pd_result)


@skip_legacy_pandas
@pytest.mark.parametrize(
("expr",),
[
("int64_col > int64_too",),
("bool_col",),
("((int64_col - int64_too) % @local_var) == 0",),
],
)
def test_df_query(scalars_dfs, expr):
# local_var is referenced in expressions
local_var = 3 # NOQA
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow! Didn't realize it went as far as snatching up locals to pass through.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, its kind of scary, you tell it how many stack frames to look up and it will bring all those variables in scope for the evaluation.

scalars_df, scalars_pandas_df = scalars_dfs

bf_result = scalars_df.query(expr).to_pandas()
pd_result = scalars_pandas_df.query(expr)

pd.testing.assert_frame_equal(bf_result, pd_result)


@pytest.mark.parametrize(
("subset", "normalize", "ascending", "dropna"),
[
Expand Down
26 changes: 26 additions & 0 deletions third_party/bigframes_vendored/pandas/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from typing import Callable, TYPE_CHECKING

from bigframes_vendored.pandas.core.dtypes.inference import iterable_not_string

if TYPE_CHECKING:
from bigframes_vendored.pandas.pandas._typing import T

Expand Down Expand Up @@ -40,3 +42,27 @@ def pipe(
return func(*args, **kwargs)
else:
return func(obj, *args, **kwargs)


def flatten(line):
"""
Flatten an arbitrarily nested sequence.

Parameters
----------
line : sequence
The non string sequence to flatten

Notes
-----
This doesn't consider strings sequences.

Returns
-------
flattened : generator
"""
for element in line:
if iterable_not_string(element):
yield from flatten(element)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL: yield from https://stackoverflow.com/a/26109157/101923 I always just iterated and yielded.

Edit: Added in Python 3.3. https://docs.python.org/3/whatsnew/3.3.html#pep-380 I had to be compatible with Python 2.x for far too long, so I missed a lot of these features.

else:
yield element
226 changes: 226 additions & 0 deletions third_party/bigframes_vendored/pandas/core/computation/align.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/computation/align.py
"""
Core eval alignment algorithms.
"""
from __future__ import annotations

from functools import partial, wraps
from typing import Callable, TYPE_CHECKING
import warnings

import bigframes_vendored.pandas.core.common as com
from bigframes_vendored.pandas.core.computation.common import result_type_many
from bigframes_vendored.pandas.util._exceptions import find_stack_level
import numpy as np
from pandas.errors import PerformanceWarning

if TYPE_CHECKING:
from collections.abc import Sequence

from bigframes_vendored.pandas.core.generic import NDFrame
from bigframes_vendored.pandas.core.indexes.base import Index
from pandas._typing import F


def _align_core_single_unary_op(
term,
) -> tuple[partial | type[NDFrame], dict[str, Index] | None]:
typ: partial | type[NDFrame]
axes: dict[str, Index] | None = None

if isinstance(term.value, np.ndarray):
typ = partial(np.asanyarray, dtype=term.value.dtype)
else:
typ = type(term.value)
if hasattr(term.value, "axes"):
axes = _zip_axes_from_type(typ, term.value.axes)

return typ, axes


def _zip_axes_from_type(
typ: type[NDFrame], new_axes: Sequence[Index]
) -> dict[str, Index]:
return {name: new_axes[i] for i, name in enumerate(typ._AXIS_ORDERS)}


def _any_pandas_objects(terms) -> bool:
"""
Check a sequence of terms for instances of PandasObject.
"""
return any(is_pandas_object(term.value) for term in terms)


def _filter_special_cases(f) -> Callable[[F], F]:
@wraps(f)
def wrapper(terms):
# single unary operand
if len(terms) == 1:
return _align_core_single_unary_op(terms[0])

term_values = (term.value for term in terms)

# we don't have any pandas objects
if not _any_pandas_objects(terms):
return result_type_many(*term_values), None

return f(terms)

return wrapper


@_filter_special_cases
def _align_core(terms):
term_index = [i for i, term in enumerate(terms) if hasattr(term.value, "axes")]
term_dims = [terms[i].value.ndim for i in term_index]

from pandas import Series

ndims = Series(dict(zip(term_index, term_dims)))

# initial axes are the axes of the largest-axis'd term
biggest = terms[ndims.idxmax()].value
typ = biggest._constructor
axes = biggest.axes
naxes = len(axes)
gt_than_one_axis = naxes > 1

for value in (terms[i].value for i in term_index):
value_is_series = is_series(value)
is_series_and_gt_one_axis = value_is_series and gt_than_one_axis

for axis, items in enumerate(value.axes):
if is_series_and_gt_one_axis:
ax, itm = naxes - 1, value.index
else:
ax, itm = axis, items

if not axes[ax].is_(itm):
axes[ax] = axes[ax].join(itm, how="outer")

for i, ndim in ndims.items():
for axis, items in zip(range(ndim), axes):
ti = terms[i].value

if hasattr(ti, "reindex"):
transpose = value_is_series(ti) and naxes > 1
reindexer = axes[naxes - 1] if transpose else items

term_axis_size = len(ti.axes[axis])
reindexer_size = len(reindexer)

ordm = np.log10(max(1, abs(reindexer_size - term_axis_size)))
if ordm >= 1 and reindexer_size >= 10000:
w = (
f"Alignment difference on axis {axis} is larger "
f"than an order of magnitude on term {repr(terms[i].name)}, "
f"by more than {ordm:.4g}; performance may suffer."
)
warnings.warn(
w, category=PerformanceWarning, stacklevel=find_stack_level()
)

obj = ti.reindex(reindexer, axis=axis, copy=False)
terms[i].update(obj)

terms[i].update(terms[i].value.values)

return typ, _zip_axes_from_type(typ, axes)


def align_terms(terms):
"""
Align a set of terms.
"""
try:
# flatten the parse tree (a nested list, really)
terms = list(com.flatten(terms))
except TypeError:
# can't iterate so it must just be a constant or single variable
if is_series_or_dataframe(terms.value):
typ = type(terms.value)
return typ, _zip_axes_from_type(typ, terms.value.axes)
return np.result_type(terms.type), None

# if all resolved variables are numeric scalars
if all(term.is_scalar for term in terms):
return result_type_many(*(term.value for term in terms)).type, None

# perform the main alignment
typ, axes = _align_core(terms)
return typ, axes


def reconstruct_object(typ, obj, axes, dtype):
"""
Reconstruct an object given its type, raw value, and possibly empty
(None) axes.

Parameters
----------
typ : object
A type
obj : object
The value to use in the type constructor
axes : dict
The axes to use to construct the resulting pandas object

Returns
-------
ret : typ
An object of type ``typ`` with the value `obj` and possible axes
`axes`.
"""
try:
typ = typ.type
except AttributeError:
pass

res_t = np.result_type(obj.dtype, dtype)

if not isinstance(typ, partial) and is_pandas_type(typ):
return typ(obj, dtype=res_t, **axes)

# special case for pathological things like ~True/~False
if hasattr(res_t, "type") and typ == np.bool_ and res_t != np.bool_:
ret_value = res_t.type(obj)
else:
ret_value = typ(obj).astype(res_t)
# The condition is to distinguish 0-dim array (returned in case of
# scalar) and 1 element array
# e.g. np.array(0) and np.array([0])
if (
len(obj.shape) == 1
and len(obj) == 1
and not isinstance(ret_value, np.ndarray)
):
ret_value = np.array([ret_value]).astype(res_t)

return ret_value


# Custom to recognize BigFrames types
def is_series(obj) -> bool:
from bigframes_vendored.pandas.core.series import Series

return isinstance(obj, Series)


def is_series_or_dataframe(obj) -> bool:
from bigframes_vendored.pandas.core.frame import NDFrame

return isinstance(obj, NDFrame)


def is_pandas_object(obj) -> bool:
from bigframes_vendored.pandas.core.frame import NDFrame
from bigframes_vendored.pandas.core.indexes.base import Index

return isinstance(obj, NDFrame) or isinstance(obj, Index)


def is_pandas_type(type) -> bool:
from bigframes_vendored.pandas.core.frame import NDFrame
from bigframes_vendored.pandas.core.indexes.base import Index

return issubclass(type, NDFrame) or issubclass(type, Index)
Loading