diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 151da51792..2349e469ab 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4199,11 +4199,13 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): udf_input_dtypes = getattr(func, "input_dtypes") if len(udf_input_dtypes) != len(self.columns): raise ValueError( - f"Remote function takes {len(udf_input_dtypes)} arguments but DataFrame has {len(self.columns)} columns." + f"BigFrames BigQuery function takes {len(udf_input_dtypes)}" + f" arguments but DataFrame has {len(self.columns)} columns." ) if udf_input_dtypes != tuple(self.dtypes.to_list()): raise ValueError( - f"Remote function takes arguments of types {udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}." + f"BigFrames BigQuery function takes arguments of types " + f"{udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}." ) series_list = [self[col] for col in self.columns] diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 15c8cb979e..0ae674b97d 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -892,6 +892,7 @@ def wrapper(func): func = cloudpickle.loads(cloudpickle.dumps(func)) self._try_delattr(func, "bigframes_bigquery_function") + self._try_delattr(func, "bigframes_bigquery_function_output_dtype") self._try_delattr(func, "input_dtypes") self._try_delattr(func, "output_dtype") self._try_delattr(func, "is_row_processor") @@ -951,6 +952,10 @@ def wrapper(func): ibis_signature.output_type ) ) + # Managed function directly supports certain output types which are + # not supported in remote function (e.g. list output). Thus no more + # processing for 'bigframes_bigquery_function_output_dtype'. + func.bigframes_bigquery_function_output_dtype = func.output_dtype func.is_row_processor = is_row_processor func.ibis_node = node diff --git a/bigframes/operations/remote_function_ops.py b/bigframes/operations/remote_function_ops.py index 8505fd1607..51cfccbc41 100644 --- a/bigframes/operations/remote_function_ops.py +++ b/bigframes/operations/remote_function_ops.py @@ -29,11 +29,12 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method + # The output dtype should be set to a valid Dtype by @udf decorator, + # @remote_function decorator, or read_gbq_function method. if hasattr(self.func, "bigframes_bigquery_function_output_dtype"): return self.func.bigframes_bigquery_function_output_dtype - else: - raise AttributeError("bigframes_bigquery_function_output_dtype not defined") + + raise AttributeError("bigframes_bigquery_function_output_dtype not defined") @dataclasses.dataclass(frozen=True) @@ -46,11 +47,12 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method + # The output dtype should be set to a valid Dtype by @udf decorator, + # @remote_function decorator, or read_gbq_function method. if hasattr(self.func, "bigframes_bigquery_function_output_dtype"): return self.func.bigframes_bigquery_function_output_dtype - else: - raise AttributeError("bigframes_bigquery_function_output_dtype not defined") + + raise AttributeError("bigframes_bigquery_function_output_dtype not defined") @dataclasses.dataclass(frozen=True) @@ -63,8 +65,9 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method + # The output dtype should be set to a valid Dtype by @udf decorator, + # @remote_function decorator, or read_gbq_function method. if hasattr(self.func, "bigframes_bigquery_function_output_dtype"): return self.func.bigframes_bigquery_function_output_dtype - else: - raise AttributeError("bigframes_bigquery_function_output_dtype not defined") + + raise AttributeError("bigframes_bigquery_function_output_dtype not defined") diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 4db7a1c47c..503720edcc 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -13,8 +13,10 @@ # limitations under the License. import pandas +import pyarrow import pytest +import bigframes from bigframes.functions import _function_session as bff_session from bigframes.functions._utils import get_python_version import bigframes.pandas as bpd @@ -164,3 +166,161 @@ def func(x, y): cleanup_function_assets( session.bqclient, session.cloudfunctionsclient, managed_func ) + + +@pytest.mark.parametrize( + "array_dtype", + [ + bool, + int, + float, + str, + ], +) +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_managed_function_array_output(session, scalars_dfs, dataset_id, array_dtype): + try: + + @session.udf(dataset=dataset_id) + def featurize(x: int) -> list[array_dtype]: # type: ignore + return [array_dtype(i) for i in [x, x + 1, x + 2]] + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_int64_col = scalars_df["int64_too"] + bf_result = bf_int64_col.apply(featurize).to_pandas() + + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result = pd_int64_col.apply(featurize) + + # Ignore any dtype disparity. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + finally: + # Clean up the gcp assets created for the managed function. + cleanup_function_assets( + featurize, session.bqclient, session.cloudfunctionsclient + ) + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_managed_function_binop_array_output(session, scalars_dfs, dataset_id): + try: + + def func(x, y): + return [len(x), abs(y % 4)] + + managed_func = session.udf( + input_types=[str, int], + output_type=list[int], + dataset=dataset_id, + )(func) + + scalars_df, scalars_pandas_df = scalars_dfs + + scalars_df = scalars_df.dropna() + scalars_pandas_df = scalars_pandas_df.dropna() + bf_result = ( + scalars_df["string_col"] + .combine(scalars_df["int64_col"], managed_func) + .to_pandas() + ) + pd_result = scalars_pandas_df["string_col"].combine( + scalars_pandas_df["int64_col"], func + ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # Clean up the gcp assets created for the managed function. + cleanup_function_assets( + managed_func, session.bqclient, session.cloudfunctionsclient + ) + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_manage_function_df_apply_axis_1_array_output(session): + bf_df = bigframes.dataframe.DataFrame( + { + "Id": [1, 2, 3], + "Age": [22.5, 23, 23.5], + "Name": ["alpha", "beta", "gamma"], + } + ) + + expected_dtypes = ( + bigframes.dtypes.INT_DTYPE, + bigframes.dtypes.FLOAT_DTYPE, + bigframes.dtypes.STRING_DTYPE, + ) + + # Assert the dataframe dtypes. + assert tuple(bf_df.dtypes) == expected_dtypes + + try: + + @session.udf(input_types=[int, float, str], output_type=list[str]) + def foo(x, y, z): + return [str(x), str(y), z] + + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes + assert getattr(foo, "output_dtype") == pandas.ArrowDtype( + pyarrow.list_( + bigframes.dtypes.bigframes_dtype_to_arrow_dtype( + bigframes.dtypes.STRING_DTYPE + ) + ) + ) + assert getattr(foo, "output_dtype") == getattr( + foo, "bigframes_bigquery_function_output_dtype" + ) + + # Fails to apply on dataframe with incompatible number of columns. + with pytest.raises( + ValueError, + match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", + ): + bf_df[["Id", "Age"]].apply(foo, axis=1) + + with pytest.raises( + ValueError, + match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) + + # Fails to apply on dataframe with incompatible column datatypes. + with pytest.raises( + ValueError, + match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns. + # and their datatypes. + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result. + expected_result = pandas.Series( + [ + ["1", "22.5", "alpha"], + ["2", "23.0", "beta"], + ["3", "23.5", "gamma"], + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) + + finally: + # Clean up the gcp assets created for the managed function. + cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 350eae3783..65bf20b966 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -2085,19 +2085,19 @@ def foo(x, y, z): # Fails to apply on dataframe with incompatible number of columns with pytest.raises( ValueError, - match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", + match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", ): bf_df[["Id", "Age"]].apply(foo, axis=1) with pytest.raises( ValueError, - match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", + match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", ): bf_df.assign(Country="lalaland").apply(foo, axis=1) # Fails to apply on dataframe with incompatible column datatypes with pytest.raises( ValueError, - match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", ): bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) @@ -2171,19 +2171,19 @@ def foo(x, y, z): # Fails to apply on dataframe with incompatible number of columns with pytest.raises( ValueError, - match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", + match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", ): bf_df[["Id", "Age"]].apply(foo, axis=1) with pytest.raises( ValueError, - match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", + match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", ): bf_df.assign(Country="lalaland").apply(foo, axis=1) # Fails to apply on dataframe with incompatible column datatypes with pytest.raises( ValueError, - match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", ): bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) @@ -2240,19 +2240,19 @@ def foo(x): # Fails to apply on dataframe with incompatible number of columns with pytest.raises( ValueError, - match="^Remote function takes 1 arguments but DataFrame has 0 columns\\.$", + match="^BigFrames BigQuery function takes 1 arguments but DataFrame has 0 columns\\.$", ): bf_df[[]].apply(foo, axis=1) with pytest.raises( ValueError, - match="^Remote function takes 1 arguments but DataFrame has 2 columns\\.$", + match="^BigFrames BigQuery function takes 1 arguments but DataFrame has 2 columns\\.$", ): bf_df.assign(Country="lalaland").apply(foo, axis=1) # Fails to apply on dataframe with incompatible column datatypes with pytest.raises( ValueError, - match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", ): bf_df.assign(Id=bf_df["Id"].astype("Float64")).apply(foo, axis=1) diff --git a/tests/system/small/functions/test_managed_function.py b/tests/system/small/functions/test_managed_function.py index 41a5785d01..e1af68512a 100644 --- a/tests/system/small/functions/test_managed_function.py +++ b/tests/system/small/functions/test_managed_function.py @@ -62,6 +62,9 @@ def foo(x): assert hasattr(foo, "bigframes_bigquery_function") assert hasattr(foo, "ibis_node") + assert hasattr(foo, "input_dtypes") + assert hasattr(foo, "output_dtype") + assert hasattr(foo, "bigframes_bigquery_function_output_dtype") scalars_df, scalars_pandas_df = scalars_dfs @@ -124,6 +127,88 @@ def add(x: int, y: int) -> int: pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +@pytest.mark.parametrize( + ("typ",), + [ + pytest.param(int), + pytest.param(float), + pytest.param(bool), + pytest.param(str), + ], +) +def test_managed_function_series_apply_list_output( + typ, + scalars_dfs, + dataset_id_permanent, +): + def foo_list(x): + # The bytes() constructor expects a non-negative interger as its arg. + return [typ(abs(x)), typ(abs(x) + 1)] + + foo_list = udf( + input_types=int, + output_type=list[typ], # type: ignore + dataset=dataset_id_permanent, + name=get_function_name(foo_list), + )(foo_list) + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result_col = scalars_df["int64_too"].apply(foo_list) + bf_result = ( + scalars_df["int64_too"].to_frame().assign(result=bf_result_col).to_pandas() + ) + + pd_result_col = scalars_pandas_df["int64_too"].apply(foo_list) + pd_result = scalars_pandas_df["int64_too"].to_frame().assign(result=pd_result_col) + + # Ignore any dtype difference. + assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_managed_function_series_combine_list_output(dataset_id_permanent, scalars_dfs): + def add_list(x: int, y: int) -> list[int]: + return [x, y] + + scalars_df, scalars_pandas_df = scalars_dfs + int_col_name_with_nulls = "int64_col" + int_col_name_no_nulls = "int64_too" + bf_df = scalars_df[[int_col_name_with_nulls, int_col_name_no_nulls]] + pd_df = scalars_pandas_df[[int_col_name_with_nulls, int_col_name_no_nulls]] + + # Make sure there are NA values in the test column. + assert any([pd.isna(val) for val in bf_df[int_col_name_with_nulls]]) + + add_list_managed_func = udf( + dataset=dataset_id_permanent, + name=get_function_name(add_list), + )(add_list) + + # After filtering out nulls the managed function application should work + # similar to pandas. + pd_filter = pd_df[int_col_name_with_nulls].notnull() + pd_result = pd_df[pd_filter][int_col_name_with_nulls].combine( + pd_df[pd_filter][int_col_name_no_nulls], add_list + ) + bf_filter = bf_df[int_col_name_with_nulls].notnull() + bf_result = ( + bf_df[bf_filter][int_col_name_with_nulls] + .combine(bf_df[bf_filter][int_col_name_no_nulls], add_list_managed_func) + .to_pandas() + ) + + # Ignore any dtype difference. + pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + @pytest.mark.skipif( get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", @@ -197,3 +282,73 @@ def add_ints(x, y): pd.testing.assert_series_equal( pd_result, bf_result, check_dtype=False, check_exact=True ) + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_managed_function_dataframe_map_list_output(scalars_dfs, dataset_id_permanent): + def add_one_list(x): + return [x + 1] * 3 + + mf_add_one_list = udf( + input_types=[int], + output_type=list[int], + dataset=dataset_id_permanent, + name=get_function_name(add_one_list), + )(add_one_list) + + scalars_df, scalars_pandas_df = scalars_dfs + int64_cols = ["int64_col", "int64_too"] + + bf_int64_df = scalars_df[int64_cols] + bf_int64_df_filtered = bf_int64_df.dropna() + bf_result = bf_int64_df_filtered.map(mf_add_one_list).to_pandas() + + pd_int64_df = scalars_pandas_df[int64_cols] + pd_int64_df_filtered = pd_int64_df.dropna() + pd_result = pd_int64_df_filtered.map(add_one_list) + + # Ignore any dtype difference. + assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + + +@pytest.mark.skipif( + get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, + reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", +) +def test_managed_function_dataframe_apply_axis_1_list_output( + session, scalars_dfs, dataset_id_permanent +): + scalars_df, scalars_pandas_df = scalars_dfs + series = scalars_df["int64_too"] + series_pandas = scalars_pandas_df["int64_too"] + + def add_ints_list(x, y): + return [x + y] * 2 + + add_ints_list_mf = session.udf( + input_types=[int, int], + output_type=list[int], + dataset=dataset_id_permanent, + name=get_function_name(add_ints_list, is_row_processor=True), + )(add_ints_list) + assert add_ints_list_mf.bigframes_bigquery_function # type: ignore + + with pytest.warns( + bigframes.exceptions.PreviewWarning, + match="axis=1 scenario is in preview.", + ): + bf_result = ( + bpd.DataFrame({"x": series, "y": series}) + .apply(add_ints_list_mf, axis=1) + .to_pandas() + ) + + pd_result = pd.DataFrame({"x": series_pandas, "y": series_pandas}).apply( + lambda row: add_ints_list(row["x"], row["y"]), axis=1 + ) + + # Ignore any dtype difference. + pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)