Skip to content

feat: limited support of lamdas in Series.apply #345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
264becb
feat: limited support of lamdas in `Series.apply`
shobsi Jan 24, 2024
f98940b
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-al…
shobsi Jan 25, 2024
266a3ea
add code sample for non-remote-function `Series.apply`
shobsi Jan 25, 2024
d3f9878
remove ..note in the middle of code samples due to rendering issue
shobsi Jan 25, 2024
af8651a
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-al…
shobsi Jan 25, 2024
89b8dd1
fix typo
shobsi Jan 25, 2024
44b1689
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-al…
shobsi Jan 25, 2024
4ed8f7b
add lambda test coverage and code samples for `Series.mask`
shobsi Jan 26, 2024
6b0dae7
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-al…
shobsi Jan 31, 2024
2603ba1
apply the non-remote function on series level
shobsi Jan 31, 2024
1b93bc2
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-al…
shobsi Feb 6, 2024
1e1551e
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-al…
shobsi Feb 8, 2024
e782877
add suggestion to use remote function if direct func errors out
shobsi Feb 9, 2024
ac200ca
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-al…
shobsi Feb 9, 2024
4357d4f
support by_row param in Series.apply
shobsi Feb 9, 2024
3b51709
raise ValueError instead of AssertionError
shobsi Feb 9, 2024
33d8b8b
fix Series.mask tests
shobsi Feb 10, 2024
6da45f0
Merge branch 'main' into shobs-allow-lambdas
shobsi Feb 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]


_remote_function_recommendation_message = (
"Your functions could not be applied directly to the Series."
" Try converting it to a remote function."
)


@log_adapter.class_logger
class Series(bigframes.operations.base.SeriesMethods, vendored_pandas_series.Series):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -1210,12 +1216,43 @@ def _groupby_values(
dropna=dropna,
)

def apply(self, func) -> Series:
def apply(
self, func, by_row: typing.Union[typing.Literal["compat"], bool] = "compat"
) -> Series:
# TODO(shobs, b/274645634): Support convert_dtype, args, **kwargs
# is actually a ternary op
# Reproject as workaround to applying filter too late. This forces the filter
# to be applied before passing data to remote function, protecting from bad
# inputs causing errors.

if by_row not in ["compat", False]:
raise ValueError("Param by_row must be one of 'compat' or False")

if not callable(func):
raise ValueError(
"Only a ufunc (a function that applies to the entire Series) or a remote function that only works on single values are supported."
)

if not hasattr(func, "bigframes_remote_function"):
# It is not a remote function
# Then it must be a vectorized function that applies to the Series
# as a whole
if by_row:
raise ValueError(
"A vectorized non-remote function can be provided only with by_row=False."
" For element-wise operation it must be a remote function."
)

try:
return func(self)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it may result in incorrect values if this isn't a true vectorized function, let's check for by_row=False. If by_row="compat" (default) then raise and suggest either remote_function or by_row=False.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, PTAL.

except Exception as ex:
# This could happen if any of the operators in func is not
# supported on a Series. Let's guide the customer to use a
# remote function instead
if hasattr(ex, "message"):
ex.message += f"\n{_remote_function_recommendation_message}"
raise

reprojected_series = Series(self._block._force_reproject())
return reprojected_series._apply_unary_op(
ops.RemoteFunctionOp(func=func, apply_on_null=True)
Expand Down Expand Up @@ -1325,7 +1362,11 @@ def duplicated(self, keep: str = "first") -> Series:

def mask(self, cond, other=None) -> Series:
if callable(cond):
cond = self.apply(cond)
if hasattr(cond, "bigframes_remote_function"):
cond = self.apply(cond)
else:
# For non-remote function assume that it is applicable on Series
cond = self.apply(cond, by_row=False)

if not isinstance(cond, Series):
raise TypeError(
Expand Down
151 changes: 151 additions & 0 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2560,6 +2560,51 @@ def test_mask_custom_value(scalars_dfs):
assert_pandas_df_equal(bf_result, pd_result)


@pytest.mark.parametrize(
("lambda_",),
[
pytest.param(lambda x: x > 0),
pytest.param(
lambda x: True if x > 0 else False,
marks=pytest.mark.xfail(
raises=ValueError,
),
),
],
ids=[
"lambda_arithmatic",
"lambda_arbitrary",
],
)
def test_mask_lambda(scalars_dfs, lambda_):
scalars_df, scalars_pandas_df = scalars_dfs

bf_col = scalars_df["int64_col"]
bf_result = bf_col.mask(lambda_).to_pandas()

pd_col = scalars_pandas_df["int64_col"]
pd_result = pd_col.mask(lambda_)

# ignore dtype check, which are Int64 and object respectively
assert_series_equal(bf_result, pd_result, check_dtype=False)


def test_mask_simple_udf(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs

def foo(x):
return x < 1000000

bf_col = scalars_df["int64_col"]
bf_result = bf_col.mask(foo).to_pandas()

pd_col = scalars_pandas_df["int64_col"]
pd_result = pd_col.mask(foo)

# ignore dtype check, which are Int64 and object respectively
assert_series_equal(bf_result, pd_result, check_dtype=False)


@pytest.mark.parametrize(
("column", "to_type"),
[
Expand Down Expand Up @@ -3042,3 +3087,109 @@ def test_series_iter(
scalars_df_index["int64_too"], scalars_pandas_df_index["int64_too"]
):
assert bf_i == pd_i


@pytest.mark.parametrize(
(
"col",
"lambda_",
),
[
pytest.param("int64_col", lambda x: x * x + x + 1),
pytest.param("int64_col", lambda x: x % 2 == 1),
pytest.param("string_col", lambda x: x + "_suffix"),
],
ids=[
"lambda_int_int",
"lambda_int_bool",
"lambda_str_str",
],
)
def test_apply_lambda(scalars_dfs, col, lambda_):
scalars_df, scalars_pandas_df = scalars_dfs

bf_col = scalars_df[col]

# Can't be applied to BigFrames Series without by_row=False
with pytest.raises(ValueError, match="by_row=False"):
bf_col.apply(lambda_)

bf_result = bf_col.apply(lambda_, by_row=False).to_pandas()

pd_col = scalars_pandas_df[col]
pd_result = pd_col.apply(lambda_)

# ignore dtype check, which are Int64 and object respectively
assert_series_equal(bf_result, pd_result, check_dtype=False)


@pytest.mark.parametrize(
("ufunc",),
[
pytest.param(numpy.log),
pytest.param(numpy.sqrt),
pytest.param(numpy.sin),
],
ids=[
"log",
"sqrt",
"sin",
],
)
def test_apply_numpy_ufunc(scalars_dfs, ufunc):
scalars_df, scalars_pandas_df = scalars_dfs

bf_col = scalars_df["int64_col"]

# Can't be applied to BigFrames Series without by_row=False
with pytest.raises(ValueError, match="by_row=False"):
bf_col.apply(ufunc)

bf_result = bf_col.apply(ufunc, by_row=False).to_pandas()

pd_col = scalars_pandas_df["int64_col"]
pd_result = pd_col.apply(ufunc)

assert_series_equal(bf_result, pd_result)


def test_apply_simple_udf(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs

def foo(x):
return x * x + 2 * x + 3

bf_col = scalars_df["int64_col"]

# Can't be applied to BigFrames Series without by_row=False
with pytest.raises(ValueError, match="by_row=False"):
bf_col.apply(foo)

bf_result = bf_col.apply(foo, by_row=False).to_pandas()

pd_col = scalars_pandas_df["int64_col"]
pd_result = pd_col.apply(foo)

# ignore dtype check, which are Int64 and object respectively
assert_series_equal(bf_result, pd_result, check_dtype=False)


@pytest.mark.parametrize(
("col", "lambda_", "exception"),
[
pytest.param("int64_col", {1: 2, 3: 4}, ValueError),
pytest.param("int64_col", numpy.square, TypeError),
pytest.param("string_col", lambda x: x.capitalize(), AttributeError),
],
ids=[
"not_callable",
"numpy_ufunc",
"custom_lambda",
],
)
def test_apply_not_supported(scalars_dfs, col, lambda_, exception):
scalars_df, _ = scalars_dfs

bf_col = scalars_df[col]
with pytest.raises(exception):
bf_col.apply(lambda_, by_row=False)
82 changes: 76 additions & 6 deletions third_party/bigframes_vendored/pandas/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,18 +1116,24 @@ def nsmallest(self, n: int = 5, keep: str = "first") -> Series:
def apply(
self,
func,
by_row="compat",
) -> DataFrame | Series:
"""
Invoke function on values of a Series.

Can be ufunc (a NumPy function that applies to the entire Series) or a
Python function that only works on single values. If it is an arbitrary
python function then converting it into a `remote_function` is recommended.

**Examples:**

>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None

Let's use ``reuse=False`` flag to make sure a new ``remote_function``
For applying arbitrary python function a `remote_funciton` is recommended.
Let's use ``reuse=False`` flag to make sure a new `remote_function`
is created every time we run the following code, but you can skip it
to potentially reuse a previously deployed ``remote_function`` from
to potentially reuse a previously deployed `remote_function` from
the same user defined function.

>>> @bpd.remote_function([int], float, reuse=False)
Expand All @@ -1152,9 +1158,9 @@ def apply(
4 2.0
dtype: Float64

You could turn a user defined function with external package
dependencies into a BigQuery DataFrames remote function. You would
provide the names of the packages via ``packages`` param.
To turn a user defined function with external package dependencies into
a `remote_function`, you would provide the names of the packages via
`packages` param.

>>> @bpd.remote_function(
... [str],
Expand All @@ -1176,11 +1182,48 @@ def apply(
>>> names = bpd.Series(["Alice", "Bob"])
>>> hashes = names.apply(get_hash)

Simple vectorized functions, lambdas or ufuncs can be applied directly
with `by_row=False`.

>>> nums = bpd.Series([1, 2, 3, 4])
>>> nums
0 1
1 2
2 3
3 4
dtype: Int64
>>> nums.apply(lambda x: x*x + 2*x + 1, by_row=False)
0 4
1 9
2 16
3 25
dtype: Int64

>>> def is_odd(num):
... return num % 2 == 1
>>> nums.apply(is_odd, by_row=False)
0 True
1 False
2 True
3 False
dtype: boolean

>>> nums.apply(np.log, by_row=False)
0 0.0
1 0.693147
2 1.098612
3 1.386294
dtype: Float64

Args:
func (function):
BigFrames DataFrames ``remote_function`` to apply. The function
should take a scalar and return a scalar. It will be applied to
every element in the ``Series``.
by_row (False or "compat", default "compat"):
If `"compat"` , func must be a remote function which will be
passed each element of the Series, like `Series.map`. If False,
the func will be passed the whole Series at once.

Returns:
bigframes.series.Series: A new Series with values representing the
Expand Down Expand Up @@ -2680,7 +2723,8 @@ def mask(self, cond, other):
dtype: Int64

You can mask the values in the Series based on a condition. The values
matching the condition would be masked.
matching the condition would be masked. The condition can be provided in
formm of a Series.

>>> s.mask(s % 2 == 0)
0 <NA>
Expand Down Expand Up @@ -2736,6 +2780,32 @@ def mask(self, cond, other):
2 Caroline
dtype: string

Simple vectorized (i.e. they only perform operations supported on a
Series) lambdas or python functions can be used directly.

>>> nums = bpd.Series([1, 2, 3, 4], name="nums")
>>> nums
0 1
1 2
2 3
3 4
Name: nums, dtype: Int64
>>> nums.mask(lambda x: (x+1) % 2 == 1)
0 1
1 <NA>
2 3
3 <NA>
Name: nums, dtype: Int64

>>> def is_odd(num):
... return num % 2 == 1
>>> nums.mask(is_odd)
0 <NA>
1 2
2 <NA>
3 4
Name: nums, dtype: Int64

Args:
cond (bool Series/DataFrame, array-like, or callable):
Where cond is False, keep the original value. Where True, replace
Expand Down