From e24944c2f8e14085a4682fa281fe9bcf09de2a61 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 17 May 2024 21:07:01 +0000 Subject: [PATCH 1/8] feat: allow functions decorated with `@bpd.remote_function` to execute locally --- bigframes/core/compile/scalar_op_compiler.py | 10 +-- bigframes/functions/remote_function.py | 21 +++--- tests/system/small/test_remote_function.py | 70 ++++++++++++++------ 3 files changed, 65 insertions(+), 36 deletions(-) diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index e8e5a1f3ac..a79a4eceab 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -856,11 +856,12 @@ def to_timestamp_op_impl(x: ibis_types.Value, op: ops.ToTimestampOp): @scalar_op_compiler.register_unary_op(ops.RemoteFunctionOp, pass_op=True) def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp): - if not hasattr(op.func, "bigframes_remote_function"): + ibis_node = getattr(op.func, "ibis_node", None) + if ibis_node is None: raise TypeError( f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}" ) - x_transformed = op.func(x) + x_transformed = ibis_node(x) if not op.apply_on_null: x_transformed = ibis.case().when(x.isnull(), x).else_(x_transformed).end() return x_transformed @@ -1342,11 +1343,12 @@ def minimum_impl( def binary_remote_function_op_impl( x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp ): - if not hasattr(op.func, "bigframes_remote_function"): + ibis_node = getattr(op.func, "ibis_node", None) + if ibis_node is None: raise TypeError( f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}" ) - x_transformed = op.func(x, y) + x_transformed = ibis_node(x, y) return x_transformed diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 9eff802cc7..b0429cf5c2 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -1018,11 +1018,11 @@ def remote_function( bq_connection_manager = None if session is None else session.bqconnectionmanager - def wrapper(f): - if not callable(f): - raise TypeError("f must be callable, got {}".format(f)) + def wrapper(func): + if not callable(func): + raise TypeError("f must be callable, got {}".format(func)) - signature = inspect.signature(f) + signature = inspect.signature(func) # TODO(b/340898611): fix type error ibis_signature = ibis_signature_from_python_signature( signature, input_types, output_type # type: ignore @@ -1043,7 +1043,7 @@ def wrapper(f): ) rf_name, cf_name = remote_function_client.provision_bq_remote_function( - f, + func, ibis_signature.input_types, ibis_signature.output_type, reuse, @@ -1058,19 +1058,20 @@ def wrapper(f): # TODO: Move ibis logic to compiler step node = ibis.udf.scalar.builtin( - f, + func, name=rf_name, schema=f"{dataset_ref.project}.{dataset_ref.dataset_id}", signature=(ibis_signature.input_types, ibis_signature.output_type), ) - node.bigframes_cloud_function = ( + func.bigframes_cloud_function = ( remote_function_client.get_cloud_function_fully_qualified_name(cf_name) ) - node.bigframes_remote_function = str(dataset_ref.routine(rf_name)) # type: ignore - node.output_dtype = bigframes.dtypes.ibis_dtype_to_bigframes_dtype( + func.bigframes_remote_function = str(dataset_ref.routine(rf_name)) # type: ignore + func.output_dtype = bigframes.dtypes.ibis_dtype_to_bigframes_dtype( ibis_signature.output_type ) - return node + func.ibis_node = node + return func return wrapper diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 4a39e75ff9..096a268441 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -67,7 +67,7 @@ def bq_cf_connection_location_project(bigquery_client) -> str: @pytest.fixture(scope="module") def bq_cf_connection_location_project_mismatched() -> str: - """Pre-created BQ connection in the migframes-metrics project in US location, + """Pre-created BQ connection in the bigframes-metrics project in US location, in format PROJECT_ID.LOCATION.CONNECTION_NAME, used to invoke cloud function. $ bq show --connection --location=us --project_id=PROJECT_ID bigframes-rf-conn @@ -108,11 +108,15 @@ def test_remote_function_direct_no_session_param( reuse=True, ) def square(x): - # This executes on a remote function, where coverage isn't tracked. - return x * x # pragma: NO COVER + return x * x - assert square.bigframes_remote_function - assert square.bigframes_cloud_function + # Function should still work normally. + assert square(2) == 4 + + # Function should have extra metadata attached for remote execution. + assert hasattr(square, "bigframes_remote_function") + assert hasattr(square, "bigframes_cloud_function") + assert hasattr(square, "ibis_node") scalars_df, scalars_pandas_df = scalars_dfs @@ -161,8 +165,10 @@ def test_remote_function_direct_no_session_param_location_specified( reuse=True, ) def square(x): - # This executes on a remote function, where coverage isn't tracked. - return x * x # pragma: NO COVER + return x * x + + # Function should still work normally. + assert square(2) == 4 scalars_df, scalars_pandas_df = scalars_dfs @@ -197,7 +203,10 @@ def test_remote_function_direct_no_session_param_location_mismatched( dataset_id_permanent, bq_cf_connection_location_mismatched, ): - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match=re.escape("The location does not match BigQuery connection location:"), + ): @rf.remote_function( [int], @@ -212,7 +221,8 @@ def test_remote_function_direct_no_session_param_location_mismatched( reuse=True, ) def square(x): - # This executes on a remote function, where coverage isn't tracked. + # Not expected to reach this code, as the location of the + # connection doesn't match the location of the dataset. return x * x # pragma: NO COVER @@ -239,8 +249,10 @@ def test_remote_function_direct_no_session_param_location_project_specified( reuse=True, ) def square(x): - # This executes on a remote function, where coverage isn't tracked. - return x * x # pragma: NO COVER + return x * x + + # Function should still work normally. + assert square(2) == 4 scalars_df, scalars_pandas_df = scalars_dfs @@ -275,7 +287,12 @@ def test_remote_function_direct_no_session_param_project_mismatched( dataset_id_permanent, bq_cf_connection_location_project_mismatched, ): - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match=re.escape( + "The project_id does not match BigQuery connection gcp_project_id:" + ), + ): @rf.remote_function( [int], @@ -290,7 +307,8 @@ def test_remote_function_direct_no_session_param_project_mismatched( reuse=True, ) def square(x): - # This executes on a remote function, where coverage isn't tracked. + # Not expected to reach this code, as the project of the + # connection doesn't match the project of the dataset. return x * x # pragma: NO COVER @@ -302,8 +320,10 @@ def test_remote_function_direct_session_param(session_with_bq_connection, scalar session=session_with_bq_connection, ) def square(x): - # This executes on a remote function, where coverage isn't tracked. - return x * x # pragma: NO COVER + return x * x + + # Function should still work normally. + assert square(2) == 4 scalars_df, scalars_pandas_df = scalars_dfs @@ -340,8 +360,10 @@ def test_remote_function_via_session_default(session_with_bq_connection, scalars # cloud function would be common and quickly reused. @session_with_bq_connection.remote_function([int], int) def square(x): - # This executes on a remote function, where coverage isn't tracked. - return x * x # pragma: NO COVER + return x * x + + # Function should still work normally. + assert square(2) == 4 scalars_df, scalars_pandas_df = scalars_dfs @@ -380,8 +402,10 @@ def test_remote_function_via_session_with_overrides( reuse=True, ) def square(x): - # This executes on a remote function, where coverage isn't tracked. - return x * x # pragma: NO COVER + return x * x + + # Function should still work normally. + assert square(2) == 4 scalars_df, scalars_pandas_df = scalars_dfs @@ -508,7 +532,7 @@ def test_skip_bq_connection_check(dataset_id_permanent): @session.remote_function([int], int, dataset=dataset_id_permanent) def add_one(x): - # This executes on a remote function, where coverage isn't tracked. + # Not expected to reach this code, as the connection doesn't exist. return x + 1 # pragma: NO COVER @@ -546,8 +570,10 @@ def test_read_gbq_function_like_original( reuse=True, ) def square1(x): - # This executes on a remote function, where coverage isn't tracked. - return x * x # pragma: NO COVER + return x * x + + # Function should still work normally. + assert square1(2) == 4 square2 = rf.read_gbq_function( function_name=square1.bigframes_remote_function, From 0475c39404d1977d1909c5eac1a042677fcb99ac Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 17 May 2024 21:58:53 +0000 Subject: [PATCH 2/8] fix read_gbq_function --- bigframes/functions/remote_function.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index b0429cf5c2..1011837a8d 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -1122,19 +1122,23 @@ def read_gbq_function( # The name "args" conflicts with the Ibis operator, so we use # non-standard names for the arguments here. - def node(*ignored_args, **ignored_kwargs): + def func(*ignored_args, **ignored_kwargs): f"""Remote function {str(routine_ref)}.""" + # TODO(swast): Construct an ibis client from bigquery_client and + # execute node via a query. # TODO: Move ibis logic to compiler step - node.__name__ = routine_ref.routine_id + func.__name__ = routine_ref.routine_id + node = ibis.udf.scalar.builtin( - node, + func, name=routine_ref.routine_id, schema=f"{routine_ref.project}.{routine_ref.dataset_id}", signature=(ibis_signature.input_types, ibis_signature.output_type), ) - node.bigframes_remote_function = str(routine_ref) # type: ignore - node.output_dtype = bigframes.dtypes.ibis_dtype_to_bigframes_dtype( # type: ignore + func.bigframes_remote_function = str(routine_ref) # type: ignore + func.output_dtype = bigframes.dtypes.ibis_dtype_to_bigframes_dtype( # type: ignore ibis_signature.output_type ) - return node + func.ibis_node = node # type: ignore + return func From 47b49a9dd8a4bdf1fd1c41dfeae4ac47452a0dbf Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 17 May 2024 22:12:52 +0000 Subject: [PATCH 3/8] feat: allow functions returned from `bpd.read_gbq_function` to execute outside of `apply` --- bigframes/functions/remote_function.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 1011837a8d..de541acb9d 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -1094,6 +1094,11 @@ def read_gbq_function( f"{constants.FEEDBACK_LINK}" ) + if session: + ibis_client = session.ibis_client + else: + ibis_client = ibis.bigquery.connect(client=bigquery_client) + try: routine_ref = get_routine_reference(function_name, bigquery_client, session) except DatasetMissingError: @@ -1124,8 +1129,10 @@ def read_gbq_function( # non-standard names for the arguments here. def func(*ignored_args, **ignored_kwargs): f"""Remote function {str(routine_ref)}.""" - # TODO(swast): Construct an ibis client from bigquery_client and - # execute node via a query. + nonlocal node # type: ignore + + expr = node(*ignored_args, **ignored_kwargs) # type: ignore + return ibis_client.execute(expr) # TODO: Move ibis logic to compiler step func.__name__ = routine_ref.routine_id From cc11de54a08aafe67f6803c9eb4a1f1475e1e0f4 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 20 May 2024 15:55:18 +0000 Subject: [PATCH 4/8] fix for rare case where re-deploy exact same function object --- bigframes/functions/remote_function.py | 15 +++++++++++++++ tests/system/large/test_remote_function.py | 5 ++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 1011837a8d..862b882c80 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -1042,6 +1042,21 @@ def wrapper(func): cloud_function_docker_repository, ) + # In the unlikely case where the user is trying to re-deploy the same + # function, cleanup the attributes we add below, first. This prevents + # the pickle from having dependencies that might not otherwise be + # present such as ibis or pandas. + def try_delattr(attr): + try: + delattr(func, attr) + except AttributeError: + pass + + try_delattr("bigframes_cloud_function") + try_delattr("bigframes_remote_function") + try_delattr("output_dtype") + try_delattr("ibis_node") + rf_name, cf_name = remote_function_client.provision_bq_remote_function( func, ibis_signature.input_types, diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index cac8483b5b..4114eaae03 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -194,6 +194,9 @@ def test_remote_function_stringify_with_ibis( def stringify(x): return f"I got {x}" + # Function should work locally. + assert stringify(42) == "I got 42" + _, dataset_name, table_name = scalars_table_id.split(".") if not ibis_client.dataset: ibis_client.dataset = dataset_name @@ -205,7 +208,7 @@ def stringify(x): pandas_df_orig = bigquery_client.query(sql).to_dataframe() col = table[col_name] - col_2x = stringify(col).name("int64_str_col") + col_2x = stringify.ibis_node(col).name("int64_str_col") table = table.mutate([col_2x]) sql = table.compile() pandas_df_new = bigquery_client.query(sql).to_dataframe() From 395b9eee433413705ad6bd0dae5b1675bda03fcf Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 22 May 2024 19:08:21 +0000 Subject: [PATCH 5/8] feat: support type annotations to supply input and output types to `@remote_function` decorator --- bigframes/functions/remote_function.py | 85 +++++++++++++------ tests/unit/resources.py | 8 +- tests/unit/test_pandas.py | 2 +- tests/unit/test_remote_function.py | 39 +++++++++ .../bigframes_vendored/pandas/core/frame.py | 16 ++-- .../bigframes_vendored/pandas/core/series.py | 16 ++-- 6 files changed, 121 insertions(+), 45 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 9eff802cc7..e7fc72d68a 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -24,14 +24,15 @@ import sys import tempfile import textwrap -from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union +import typing +from typing import Any, List, Mapping, NamedTuple, Optional, Sequence, Union import warnings import ibis import pandas import requests -if TYPE_CHECKING: +if typing.TYPE_CHECKING: from bigframes.session import Session import bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes @@ -735,8 +736,8 @@ def get_routine_reference( # which has moved as @js to the ibis package # https://github.com/ibis-project/ibis/blob/master/ibis/backends/bigquery/udf/__init__.py def remote_function( - input_types: Union[type, Sequence[type]], - output_type: type, + input_types: Union[None, type, Sequence[type]] = None, + output_type: Optional[type] = None, session: Optional[Session] = None, bigquery_client: Optional[bigquery.Client] = None, bigquery_connection_client: Optional[ @@ -800,11 +801,11 @@ def remote_function( `$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`. Args: - input_types (type or sequence(type)): + input_types (None, type, or sequence(type)): For scalar user defined function it should be the input type or sequence of input types. For row processing user defined function, type `Series` should be specified. - output_type (type): + output_type (Optional[type]): Data type of the output in the user defined function. session (bigframes.Session, Optional): BigQuery DataFrames session to use for getting default project, @@ -907,26 +908,9 @@ def remote_function( service(s) that are on a VPC network. See for more details https://cloud.google.com/functions/docs/networking/connecting-vpc. """ - is_row_processor = False - - import bigframes.series - - if input_types == bigframes.series.Series: - warnings.warn( - "input_types=Series scenario is in preview.", - stacklevel=1, - category=bigframes.exceptions.PreviewWarning, - ) - - # we will model the row as a json serialized string containing the data - # and the metadata representing the row - input_types = [str] - is_row_processor = True - elif isinstance(input_types, type): - input_types = [input_types] - # Some defaults may be used from the session if not provided otherwise import bigframes.pandas as bpd + import bigframes.series session = session or bpd.get_global_session() @@ -1019,10 +1003,61 @@ def remote_function( bq_connection_manager = None if session is None else session.bqconnectionmanager def wrapper(f): + nonlocal input_types, output_type + if not callable(f): raise TypeError("f must be callable, got {}".format(f)) - signature = inspect.signature(f) + if sys.version_info >= (3, 10): + # Add `eval_str = True` so that deferred annotations are turned into their + # corresponding type objects. Need Python 3.10 for eval_str parameter. + # https://docs.python.org/3/library/inspect.html#inspect.signature + signature_kwargs: Mapping[str, Any] = {"eval_str": True} + else: + signature_kwargs = {} + + signature = inspect.signature( + f, + **signature_kwargs, + ) + + # Try to get input types via type annotations. + if input_types is None: + input_types = [] + for parameter in signature.parameters.values(): + if (param_type := parameter.annotation) is inspect.Signature.empty: + raise ValueError( + "'input_types' was not set and parameter " + f"'{parameter.name}' is missing a type annotation. " + "Types are required to use @remote_function." + ) + input_types.append(param_type) + + if output_type is None: + if (output_type := signature.return_annotation) is inspect.Signature.empty: + raise ValueError( + "'output_type' was not set and function is missing a " + "return type annotation. Types are required to use " + "@remote_function." + ) + + # The function will actually be receiving a pandas Series, but allow both + # BigQuery DataFrames and pandas object types for compatibility. + is_row_processor = False + if input_types == bigframes.series.Series or input_types == pandas.Series: + warnings.warn( + "input_types=Series scenario is in preview.", + stacklevel=1, + category=bigframes.exceptions.PreviewWarning, + ) + + # we will model the row as a json serialized string containing the data + # and the metadata representing the row + input_types = [str] + is_row_processor = True + elif isinstance(input_types, type): + input_types = [input_types] + # TODO(b/340898611): fix type error ibis_signature = ibis_signature_from_python_signature( signature, input_types, output_type # type: ignore diff --git a/tests/unit/resources.py b/tests/unit/resources.py index 623af93f65..4d7998903c 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -39,6 +39,7 @@ def create_bigquery_session( session_id: str = "abcxyz", table_schema: Sequence[google.cloud.bigquery.SchemaField] = TEST_SCHEMA, anonymous_dataset: Optional[google.cloud.bigquery.DatasetReference] = None, + location: str = "test-region", ) -> bigframes.Session: credentials = mock.create_autospec( google.auth.credentials.Credentials, instance=True @@ -53,11 +54,12 @@ def create_bigquery_session( if bqclient is None: bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" + bqclient.location = location # Mock the location. table = mock.create_autospec(google.cloud.bigquery.Table, instance=True) table._properties = {} - type(table).location = mock.PropertyMock(return_value="test-region") + type(table).location = mock.PropertyMock(return_value=location) type(table).schema = mock.PropertyMock(return_value=table_schema) type(table).reference = mock.PropertyMock( return_value=anonymous_dataset.table("test_table") @@ -93,9 +95,7 @@ def query_mock(query, *args, **kwargs): type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient) clients_provider._credentials = credentials - bqoptions = bigframes.BigQueryOptions( - credentials=credentials, location="test-region" - ) + bqoptions = bigframes.BigQueryOptions(credentials=credentials, location=location) session = bigframes.Session(context=bqoptions, clients_provider=clients_provider) return session diff --git a/tests/unit/test_pandas.py b/tests/unit/test_pandas.py index 70639315be..54a7a79d3c 100644 --- a/tests/unit/test_pandas.py +++ b/tests/unit/test_pandas.py @@ -50,7 +50,7 @@ def all_session_methods(): [(method_name,) for method_name in all_session_methods()], ) def test_method_matches_session(method_name: str): - if sys.version_info <= (3, 10): + if sys.version_info < (3, 10): pytest.skip( "Need Python 3.10 to reconcile deferred annotations." ) # pragma: no cover diff --git a/tests/unit/test_remote_function.py b/tests/unit/test_remote_function.py index 1acff27c7f..ae9ab296c5 100644 --- a/tests/unit/test_remote_function.py +++ b/tests/unit/test_remote_function.py @@ -14,8 +14,11 @@ import bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes from ibis.expr import datatypes as ibis_types +import pytest import bigframes.dtypes +import bigframes.functions.remote_function +from tests.unit import resources def test_supported_types_correspond(): @@ -29,3 +32,39 @@ def test_supported_types_correspond(): } assert ibis_types_from_python == ibis_types_from_bigquery + + +def test_missing_input_types(): + session = resources.create_bigquery_session() + remote_function_decorator = bigframes.functions.remote_function.remote_function( + session=session + ) + + def function_without_parameter_annotations(myparam) -> str: + return str(myparam) + + assert function_without_parameter_annotations(42) == "42" + + with pytest.raises( + ValueError, + match="'input_types' was not set .* 'myparam' is missing a type annotation", + ): + remote_function_decorator(function_without_parameter_annotations) + + +def test_missing_output_type(): + session = resources.create_bigquery_session() + remote_function_decorator = bigframes.functions.remote_function.remote_function( + session=session + ) + + def function_without_return_annotation(myparam: int): + return str(myparam) + + assert function_without_return_annotation(42) == "42" + + with pytest.raises( + ValueError, + match="'output_type' was not set .* missing a return type annotation", + ): + remote_function_decorator(function_without_return_annotation) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 31d5e88c7e..d9f1081267 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -3892,8 +3892,8 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: to potentially reuse a previously deployed ``remote_function`` from the same user defined function. - >>> @bpd.remote_function(int, float, reuse=False) - ... def minutes_to_hours(x): + >>> @bpd.remote_function(reuse=False) + ... def minutes_to_hours(x: int) -> float: ... return x/60 >>> df_minutes = bpd.DataFrame( @@ -4214,6 +4214,7 @@ def apply(self, func, *, axis=0, args=(), **kwargs): **Examples:** >>> import bigframes.pandas as bpd + >>> import pandas as pd >>> bpd.options.display.progress_bar = None >>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) @@ -4235,16 +4236,19 @@ def apply(self, func, *, axis=0, args=(), **kwargs): [2 rows x 2 columns] You could apply a user defined function to every row of the DataFrame by - creating a remote function out of it, and using it with `axis=1`. + creating a remote function out of it, and using it with `axis=1`. Within + the function, each row is passed as a ``pandas.Series``. It is recommended + to select only the necessary columns before calling `apply()`. Note: This + feature is currently in **preview**. - >>> @bpd.remote_function(bpd.Series, int, reuse=False) - ... def foo(row): + >>> @bpd.remote_function(reuse=False) + ... def foo(row: pd.Series) -> int: ... result = 1 ... result += row["col1"] ... result += row["col2"]*row["col2"] ... return result - >>> df.apply(foo, axis=1) + >>> df[["col1", "col2"]].apply(foo, axis=1) 0 11 1 19 dtype: Int64 diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 585e20275c..56f1c8b3e0 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -1183,8 +1183,8 @@ def apply( to potentially reuse a previously deployed `remote_function` from the same user defined function. - >>> @bpd.remote_function(int, float, reuse=False) - ... def minutes_to_hours(x): + >>> @bpd.remote_function(reuse=False) + ... def minutes_to_hours(x: int) -> float: ... return x/60 >>> minutes = bpd.Series([0, 30, 60, 90, 120]) @@ -1210,12 +1210,10 @@ def apply( `packages` param. >>> @bpd.remote_function( - ... str, - ... str, ... reuse=False, ... packages=["cryptography"], ... ) - ... def get_hash(input): + ... def get_hash(input: str) -> str: ... from cryptography.fernet import Fernet ... ... # handle missing value @@ -3452,8 +3450,8 @@ def mask(self, cond, other): condition is evaluated based on a complicated business logic which cannot be expressed in form of a Series. - >>> @bpd.remote_function(str, bool, reuse=False) - ... def should_mask(name): + >>> @bpd.remote_function(reuse=False) + ... def should_mask(name: str) -> bool: ... hash = 0 ... for char_ in name: ... hash += ord(char_) @@ -3971,8 +3969,8 @@ def map( It also accepts a remote function: - >>> @bpd.remote_function(str, str) - ... def my_mapper(val): + >>> @bpd.remote_function + ... def my_mapper(val: str) -> str: ... vowels = ["a", "e", "i", "o", "u"] ... if val: ... return "".join([ From 1cf934e173d1bae93aec196a36fa924fb2e65daf Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 24 May 2024 21:25:32 +0000 Subject: [PATCH 6/8] make tests robust to cloud function listing failures too --- tests/system/conftest.py | 102 +++++++++++++++++++-------------------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 1de1f54700..f7fbd5f4b6 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -18,9 +18,9 @@ import math import pathlib import textwrap +import traceback import typing from typing import Dict, Generator, Optional -import warnings import google.api_core.exceptions import google.cloud.bigquery as bigquery @@ -1097,54 +1097,54 @@ def cleanup_cloud_functions(session, cloudfunctions_client, dataset_id_permanent session.bqclient, dataset_id_permanent ) delete_count = 0 - for cloud_function in tests.system.utils.get_cloud_functions( - cloudfunctions_client, - session.bqclient.project, - session.bqclient.location, - name_prefix="bigframes-", - ): - # Ignore bigframes cloud functions referred by the remote functions in - # the permanent dataset - if cloud_function.service_config.uri in permanent_endpoints: - continue - - # Ignore the functions less than one day old - age = datetime.now() - datetime.fromtimestamp( - cloud_function.update_time.timestamp() - ) - if age.days <= 0: - continue - - # Go ahead and delete - try: - tests.system.utils.delete_cloud_function( - cloudfunctions_client, cloud_function.name + try: + for cloud_function in tests.system.utils.get_cloud_functions( + cloudfunctions_client, + session.bqclient.project, + session.bqclient.location, + name_prefix="bigframes-", + ): + # Ignore bigframes cloud functions referred by the remote functions in + # the permanent dataset + if cloud_function.service_config.uri in permanent_endpoints: + continue + + # Ignore the functions less than one day old + age = datetime.now() - datetime.fromtimestamp( + cloud_function.update_time.timestamp() ) - delete_count += 1 - if delete_count >= MAX_NUM_FUNCTIONS_TO_DELETE_PER_SESSION: - break - except google.api_core.exceptions.NotFound: - # This can happen when multiple pytest sessions are running in - # parallel. Two or more sessions may discover the same cloud - # function, but only one of them would be able to delete it - # successfully, while the other instance will run into this - # exception. Ignore this exception. - pass - except Exception as exc: - # Don't fail the tests for unknown exceptions. - # - # This can happen if we are hitting GCP limits, e.g. - # google.api_core.exceptions.ResourceExhausted: 429 Quota exceeded - # for quota metric 'Per project mutation requests' and limit - # 'Per project mutation requests per minute per region' of service - # 'cloudfunctions.googleapis.com' for consumer - # 'project_number:1084210331973'. - # [reason: "RATE_LIMIT_EXCEEDED" domain: "googleapis.com" ... - # - # It can also happen occasionally with - # google.api_core.exceptions.ServiceUnavailable when there is some - # backend flakiness. - # - # Let's stop further clean up and leave it to later. - warnings.warn(f"Cloud functions cleanup failed: {str(exc)}") - break + if age.days <= 0: + continue + + # Go ahead and delete + try: + tests.system.utils.delete_cloud_function( + cloudfunctions_client, cloud_function.name + ) + delete_count += 1 + if delete_count >= MAX_NUM_FUNCTIONS_TO_DELETE_PER_SESSION: + break + except google.api_core.exceptions.NotFound: + # This can happen when multiple pytest sessions are running in + # parallel. Two or more sessions may discover the same cloud + # function, but only one of them would be able to delete it + # successfully, while the other instance will run into this + # exception. Ignore this exception. + pass + except Exception as exc: + # Don't fail the tests for unknown exceptions. + # + # This can happen if we are hitting GCP limits, e.g. + # google.api_core.exceptions.ResourceExhausted: 429 Quota exceeded + # for quota metric 'Per project mutation requests' and limit + # 'Per project mutation requests per minute per region' of service + # 'cloudfunctions.googleapis.com' for consumer + # 'project_number:1084210331973'. + # [reason: "RATE_LIMIT_EXCEEDED" domain: "googleapis.com" ... + # + # It can also happen occasionally with + # google.api_core.exceptions.ServiceUnavailable when there is some + # backend flakiness. + # + # Let's stop further clean up and leave it to later. + traceback.print_exception(exc) From b3d8223dc7c3effe6d3410a95a8a40cb3f2b4677 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 30 May 2024 14:52:21 +0000 Subject: [PATCH 7/8] remove unused bigquery_client argument --- bigframes/functions/remote_function.py | 21 ++++------------ tests/system/small/test_remote_function.py | 28 ++++++++++++++-------- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 101b6ab7fd..bef537f080 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -1140,27 +1140,14 @@ def try_delattr(attr): def read_gbq_function( function_name: str, - session: Optional[Session] = None, - bigquery_client: Optional[bigquery.Client] = None, + *, + session: Session, ): """ Read an existing BigQuery function and prepare it for use in future queries. """ - - # A BigQuery client is required to perform BQ operations - if not bigquery_client and session: - bigquery_client = session.bqclient - if not bigquery_client: - raise ValueError( - "A bigquery client must be provided, either directly or via session. " - f"{constants.FEEDBACK_LINK}" - ) - - if session: - ibis_client = session.ibis_client - else: - # Construct an ibis client so that directly executing the function can work. - ibis_client = ibis.bigquery.connect(client=bigquery_client) + bigquery_client = session.bqclient + ibis_client = session.ibis_client try: routine_ref = get_routine_reference(function_name, bigquery_client, session) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 096a268441..af7f2a5554 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -537,12 +537,12 @@ def add_one(x): @pytest.mark.flaky(retries=2, delay=120) -def test_read_gbq_function_detects_invalid_function(bigquery_client, dataset_id): +def test_read_gbq_function_detects_invalid_function(session, dataset_id): dataset_ref = bigquery.DatasetReference.from_string(dataset_id) with pytest.raises(ValueError) as e: rf.read_gbq_function( str(dataset_ref.routine("not_a_function")), - bigquery_client=bigquery_client, + session=session, ) assert "Unknown function" in str(e.value) @@ -550,6 +550,7 @@ def test_read_gbq_function_detects_invalid_function(bigquery_client, dataset_id) @pytest.mark.flaky(retries=2, delay=120) def test_read_gbq_function_like_original( + session, bigquery_client, bigqueryconnection_client, cloudfunctions_client, @@ -577,7 +578,7 @@ def square1(x): square2 = rf.read_gbq_function( function_name=square1.bigframes_remote_function, - bigquery_client=bigquery_client, + session=session, ) # The newly-created function (square1) should have a remote function AND a @@ -607,7 +608,7 @@ def square1(x): @pytest.mark.flaky(retries=2, delay=120) -def test_read_gbq_function_reads_udfs(bigquery_client, dataset_id): +def test_read_gbq_function_reads_udfs(session, bigquery_client, dataset_id): dataset_ref = bigquery.DatasetReference.from_string(dataset_id) arg = bigquery.RoutineArgument( name="x", @@ -633,7 +634,8 @@ def test_read_gbq_function_reads_udfs(bigquery_client, dataset_id): # Create the routine in BigQuery and read it back using read_gbq_function. bigquery_client.create_routine(routine, exists_ok=True) square = rf.read_gbq_function( - str(routine.reference), bigquery_client=bigquery_client + str(routine.reference), + session=session, ) # It should point to the named routine and yield the expected results. @@ -658,7 +660,9 @@ def test_read_gbq_function_reads_udfs(bigquery_client, dataset_id): @pytest.mark.flaky(retries=2, delay=120) -def test_read_gbq_function_enforces_explicit_types(bigquery_client, dataset_id): +def test_read_gbq_function_enforces_explicit_types( + session, bigquery_client, dataset_id +): dataset_ref = bigquery.DatasetReference.from_string(dataset_id) typed_arg = bigquery.RoutineArgument( name="x", @@ -702,18 +706,22 @@ def test_read_gbq_function_enforces_explicit_types(bigquery_client, dataset_id): bigquery_client.create_routine(neither_type_specified, exists_ok=True) rf.read_gbq_function( - str(both_types_specified.reference), bigquery_client=bigquery_client + str(both_types_specified.reference), + session=session, ) rf.read_gbq_function( - str(only_return_type_specified.reference), bigquery_client=bigquery_client + str(only_return_type_specified.reference), + session=session, ) with pytest.raises(ValueError): rf.read_gbq_function( - str(only_arg_type_specified.reference), bigquery_client=bigquery_client + str(only_arg_type_specified.reference), + session=session, ) with pytest.raises(ValueError): rf.read_gbq_function( - str(neither_type_specified.reference), bigquery_client=bigquery_client + str(neither_type_specified.reference), + session=session, ) From b1ae0b978bb4f9519474182aa49b3e9082de0956 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 30 May 2024 14:55:49 +0000 Subject: [PATCH 8/8] add test that function can be called directly --- tests/system/small/test_remote_function.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index af7f2a5554..c91dd2ce5e 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -607,6 +607,13 @@ def square1(x): assert_pandas_df_equal(s1_result.to_pandas(), s2_result.to_pandas()) +@pytest.mark.flaky(retries=2, delay=120) +def test_read_gbq_function_runs_existing_udf(session, bigquery_client, dataset_id): + func = session.read_gbq_function("bqutil.fn.cw_lower_case_ascii_only") + got = func("AURÉLIE") + assert got == "aurÉlie" + + @pytest.mark.flaky(retries=2, delay=120) def test_read_gbq_function_reads_udfs(session, bigquery_client, dataset_id): dataset_ref = bigquery.DatasetReference.from_string(dataset_id)