From edc34cad8d3f1183e9c9dd28e210b4afdd3ad707 Mon Sep 17 00:00:00 2001 From: jialuo Date: Thu, 10 Jul 2025 00:05:20 +0000 Subject: [PATCH 1/5] feat: support new runtime options for udf --- bigframes/functions/_function_client.py | 12 +++ bigframes/functions/_function_session.py | 19 ++++ bigframes/pandas/__init__.py | 6 ++ bigframes/session/__init__.py | 19 ++++ .../large/functions/test_managed_function.py | 99 +++++++++++++++++++ 5 files changed, 155 insertions(+) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 1833ac489c..2b6e78f00d 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -202,6 +202,9 @@ def provision_bq_managed_function( output_type: str, name: Optional[str], packages: Optional[Sequence[str]], + max_batching_rows: Optional[int], + container_cpu: Optional[float], + container_memory: Optional[str], is_row_processor: bool, bq_connection_id, *, @@ -234,6 +237,12 @@ def provision_bq_managed_function( "runtime_version": _MANAGED_FUNC_PYTHON_VERSION, "entry_point": "bigframes_handler", } + if max_batching_rows: + managed_function_options["max_batching_rows"] = max_batching_rows + if container_cpu: + managed_function_options["container_cpu"] = container_cpu + if container_memory: + managed_function_options["container_memory"] = container_memory # Augment user package requirements with any internal package # requirements. @@ -311,6 +320,9 @@ def bigframes_handler(*args): .replace("__UDF_PLACE_HOLDER__", udf_code) ) + print("") + print(create_function_ddl) + # breakpoint() self._ensure_dataset_exists() self._create_bq_function(create_function_ddl) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index a7910127e4..82de1a6719 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -702,6 +702,9 @@ def udf( bigquery_connection: Optional[str] = None, name: Optional[str] = None, packages: Optional[Sequence[str]] = None, + max_batching_rows: Optional[int] = None, + container_cpu: Optional[float] = None, + container_memory: Optional[str] = None, ): """Decorator to turn a Python user defined function (udf) into a BigQuery managed function. @@ -769,6 +772,19 @@ def udf( dependency is added to the `requirements.txt` as is, and can be of the form supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. + max_batching_rows (int, Optional): + The maximum number of rows in each batch. If you specify + max_batching_rows, BigQuery determines the number of rows in a + batch, up to the max_batching_rows limit. If max_batching_rows + is not specified, the number of rows to batch is determined + automatically. + container_cpu (float, Optional): + The CPU limits for containers that run Python UDFs. By default, + the CPU allocated is 0.33 vCPU. + container_memory (str, Optional): + The memory limits for containers that run Python UDFs. By + default, the memory allocated to each container instance is + 512 MiB. """ warnings.warn("udf is in preview.", category=bfe.PreviewWarning, stacklevel=5) @@ -854,6 +870,9 @@ def wrapper(func): output_type=udf_sig.sql_output_type, name=name, packages=packages, + max_batching_rows=max_batching_rows, + container_cpu=container_cpu, + container_memory=container_memory, is_row_processor=is_row_processor, bq_connection_id=bq_connection_id, ) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index f163d25757..76e0f8719b 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -142,6 +142,9 @@ def udf( bigquery_connection: Optional[str] = None, name: str, packages: Optional[Sequence[str]] = None, + max_batching_rows: Optional[int] = None, + container_cpu: Optional[float] = None, + container_memory: Optional[str] = None, ): return global_session.with_default_session( bigframes.session.Session.udf, @@ -151,6 +154,9 @@ def udf( bigquery_connection=bigquery_connection, name=name, packages=packages, + max_batching_rows=max_batching_rows, + container_cpu=container_cpu, + container_memory=container_memory, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 9d113743cf..e111c3dc98 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1684,6 +1684,9 @@ def udf( bigquery_connection: Optional[str] = None, name: str, packages: Optional[Sequence[str]] = None, + max_batching_rows: Optional[int] = None, + container_cpu: Optional[float] = None, + container_memory: Optional[str] = None, ): """Decorator to turn a Python user defined function (udf) into a [BigQuery managed user-defined function](https://cloud.google.com/bigquery/docs/user-defined-functions-python). @@ -1805,6 +1808,19 @@ def udf( dependency is added to the `requirements.txt` as is, and can be of the form supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. + max_batching_rows (int, Optional): + The maximum number of rows in each batch. If you specify + max_batching_rows, BigQuery determines the number of rows in a + batch, up to the max_batching_rows limit. If max_batching_rows + is not specified, the number of rows to batch is determined + automatically. + container_cpu (float, Optional): + The CPU limits for containers that run Python UDFs. By default, + the CPU allocated is 0.33 vCPU. + container_memory (str, Optional): + The memory limits for containers that run Python UDFs. By + default, the memory allocated to each container instance is + 512 MiB. Returns: collections.abc.Callable: A managed function object pointing to the cloud assets created @@ -1826,6 +1842,9 @@ def udf( bigquery_connection=bigquery_connection, name=name, packages=packages, + max_batching_rows=max_batching_rows, + container_cpu=container_cpu, + container_memory=container_memory, ) def read_gbq_function( diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index ad5849eb2f..477c153815 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -549,3 +549,102 @@ def foo(x: int) -> int: finally: # Clean up the gcp assets created for the managed function. cleanup_function_assets(foo, session.bqclient, ignore_failures=False) + + +def test_managed_function_resources(session, dataset_id, scalars_dfs): + try: + + def multiply_five(x: int) -> int: + return x * 5 + + mf_multiply_five = session.udf( + dataset=dataset_id, + name=prefixer.create_prefix(), + max_batching_rows=100, + container_cpu=2, + container_memory="2Gi", + )(multiply_five) + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_int64_df = scalars_df["int64_col"] + bf_int64_df_filtered = bf_int64_df.dropna() + bf_result = bf_int64_df_filtered.apply(mf_multiply_five).to_pandas() + + pd_int64_df = scalars_pandas_df["int64_col"] + pd_int64_df_filtered = pd_int64_df.dropna() + pd_result = pd_int64_df_filtered.apply(multiply_five) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + # Make sure the read_gbq_function path works for this function. + multiply_five_ref = session.read_gbq_function( + function_name=mf_multiply_five.bigframes_bigquery_function, # type: ignore + ) + assert mf_multiply_five.bigframes_bigquery_function == multiply_five_ref.bigframes_bigquery_function # type: ignore + + bf_result = bf_int64_df_filtered.apply(multiply_five_ref).to_pandas() + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + # Retrieve the routine and validate its runtime configuration. + routine = session.bqclient.get_routine( + mf_multiply_five.bigframes_bigquery_function + ) + assert routine._properties["externalRuntimeOptions"]["maxBatchingRows"] == "100" + assert routine._properties["externalRuntimeOptions"]["containerCpu"] == 2 + assert routine._properties["externalRuntimeOptions"]["containerMemory"] == "2Gi" + finally: + # Clean up the gcp assets created for the managed function. + cleanup_function_assets( + mf_multiply_five, session.bqclient, ignore_failures=False + ) + + +def test_managed_function_resources_errors(session, dataset_id): + def foo(x: int) -> int: + return 0 + + with pytest.raises( + google.api_core.exceptions.BadRequest, + match=( + "Invalid container_cpu function OPTIONS value: 2.50. " + "For CPU Value >= 1.0, the value must be one of \\[1, 2, 4, 6, 8\\]" + ), + ): + session.udf( + dataset=dataset_id, + name=prefixer.create_prefix(), + max_batching_rows=100, + container_cpu=2.5, + container_memory="2Gi", + )(foo) + + with pytest.raises( + google.api_core.exceptions.BadRequest, + match=( + "Invalid container_cpu function OPTIONS value: 0.10. " + "For less than 1.0 CPU, the value must be no less than 0.33." + ), + ): + session.udf( + dataset=dataset_id, + name=prefixer.create_prefix(), + max_batching_rows=100, + container_cpu=0.10, + container_memory="512Mi", + )(foo) + + with pytest.raises( + google.api_core.exceptions.BadRequest, + match=( + "Invalid container_memory function OPTIONS value: 1Gi. " + "For 8.00 CPU, the memory must be in the range of \\[4Gi, 32Gi\\]." + ), + ): + session.udf( + dataset=dataset_id, + name=prefixer.create_prefix(), + max_batching_rows=100, + container_cpu=8, + container_memory="1Gi", + )(foo) From ac16a71c97eda7d84df5c73ed545ef4bd3695646 Mon Sep 17 00:00:00 2001 From: jialuo Date: Thu, 10 Jul 2025 00:15:10 +0000 Subject: [PATCH 2/5] fix --- bigframes/functions/_function_client.py | 3 --- bigframes/functions/_function_session.py | 3 ++- bigframes/session/__init__.py | 3 ++- tests/system/large/functions/test_managed_function.py | 2 ++ 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 2b6e78f00d..2c9dd0cb31 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -320,9 +320,6 @@ def bigframes_handler(*args): .replace("__UDF_PLACE_HOLDER__", udf_code) ) - print("") - print(create_function_ddl) - # breakpoint() self._ensure_dataset_exists() self._create_bq_function(create_function_ddl) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 82de1a6719..6fa2dd1f53 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -784,7 +784,8 @@ def udf( container_memory (str, Optional): The memory limits for containers that run Python UDFs. By default, the memory allocated to each container instance is - 512 MiB. + 512 MiB. See details at + https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits. """ warnings.warn("udf is in preview.", category=bfe.PreviewWarning, stacklevel=5) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index e111c3dc98..490a73631d 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1820,7 +1820,8 @@ def udf( container_memory (str, Optional): The memory limits for containers that run Python UDFs. By default, the memory allocated to each container instance is - 512 MiB. + 512 MiB. See details at + https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits. Returns: collections.abc.Callable: A managed function object pointing to the cloud assets created diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 477c153815..45e9fcc9ff 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -590,9 +590,11 @@ def multiply_five(x: int) -> int: routine = session.bqclient.get_routine( mf_multiply_five.bigframes_bigquery_function ) + assert routine._properties["externalRuntimeOptions"]["maxBatchingRows"] == "100" assert routine._properties["externalRuntimeOptions"]["containerCpu"] == 2 assert routine._properties["externalRuntimeOptions"]["containerMemory"] == "2Gi" + finally: # Clean up the gcp assets created for the managed function. cleanup_function_assets( From 5402ec345384e5761ba5c2718486c3f03ee66970 Mon Sep 17 00:00:00 2001 From: jialuo Date: Thu, 10 Jul 2025 17:49:01 +0000 Subject: [PATCH 3/5] fix test --- .../large/functions/test_managed_function.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 45e9fcc9ff..420e21c3f1 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -608,10 +608,8 @@ def foo(x: int) -> int: with pytest.raises( google.api_core.exceptions.BadRequest, - match=( - "Invalid container_cpu function OPTIONS value: 2.50. " - "For CPU Value >= 1.0, the value must be one of \\[1, 2, 4, 6, 8\\]" - ), + # For CPU Value >= 1.0, the value must be one of [1, 2, ...]. + match="Invalid container_cpu function OPTIONS value", ): session.udf( dataset=dataset_id, @@ -623,10 +621,8 @@ def foo(x: int) -> int: with pytest.raises( google.api_core.exceptions.BadRequest, - match=( - "Invalid container_cpu function OPTIONS value: 0.10. " - "For less than 1.0 CPU, the value must be no less than 0.33." - ), + # For less than 1.0 CPU, the value must be no less than 0.33. + match="Invalid container_cpu function OPTIONS value", ): session.udf( dataset=dataset_id, @@ -638,10 +634,8 @@ def foo(x: int) -> int: with pytest.raises( google.api_core.exceptions.BadRequest, - match=( - "Invalid container_memory function OPTIONS value: 1Gi. " - "For 8.00 CPU, the memory must be in the range of \\[4Gi, 32Gi\\]." - ), + # For 8.00 CPU, the memory must be in the range of [4Gi, 32Gi]. + match="Invalid container_memory function OPTIONS value", ): session.udf( dataset=dataset_id, From 1ac342d8610e3a0fec8c302a6143efb00e1000d4 Mon Sep 17 00:00:00 2001 From: jialuo Date: Thu, 10 Jul 2025 18:29:51 +0000 Subject: [PATCH 4/5] fix test --- tests/system/large/functions/test_managed_function.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 420e21c3f1..0fcf997b22 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -634,13 +634,13 @@ def foo(x: int) -> int: with pytest.raises( google.api_core.exceptions.BadRequest, - # For 8.00 CPU, the memory must be in the range of [4Gi, 32Gi]. + # For 2.00 CPU, the memory must be in the range of [256Mi, 8Gi]. match="Invalid container_memory function OPTIONS value", ): session.udf( dataset=dataset_id, name=prefixer.create_prefix(), max_batching_rows=100, - container_cpu=8, - container_memory="1Gi", + container_cpu=2, + container_memory="64Mi", )(foo) From bbe17e7b1183535fa5eaefd31fffd48e5cb68455 Mon Sep 17 00:00:00 2001 From: jialuo Date: Tue, 15 Jul 2025 22:36:07 +0000 Subject: [PATCH 5/5] resolve the comments --- bigframes/functions/_function_session.py | 3 ++- bigframes/session/__init__.py | 3 ++- tests/system/large/functions/test_managed_function.py | 7 +++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 6fa2dd1f53..22e6981c38 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -780,7 +780,8 @@ def udf( automatically. container_cpu (float, Optional): The CPU limits for containers that run Python UDFs. By default, - the CPU allocated is 0.33 vCPU. + the CPU allocated is 0.33 vCPU. See details at + https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits. container_memory (str, Optional): The memory limits for containers that run Python UDFs. By default, the memory allocated to each container instance is diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 06fa294149..d27cd48cdd 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1818,7 +1818,8 @@ def udf( automatically. container_cpu (float, Optional): The CPU limits for containers that run Python UDFs. By default, - the CPU allocated is 0.33 vCPU. + the CPU allocated is 0.33 vCPU. See details at + https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits. container_memory (str, Optional): The memory limits for containers that run Python UDFs. By default, the memory allocated to each container instance is diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 0fcf997b22..c58610d1ff 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -551,7 +551,7 @@ def foo(x: int) -> int: cleanup_function_assets(foo, session.bqclient, ignore_failures=False) -def test_managed_function_resources(session, dataset_id, scalars_dfs): +def test_managed_function_options(session, dataset_id, scalars_dfs): try: def multiply_five(x: int) -> int: @@ -591,6 +591,9 @@ def multiply_five(x: int) -> int: mf_multiply_five.bigframes_bigquery_function ) + # TODO(jialuo): Use the newly exposed class properties instead of + # accessing the hidden _properties after resolve of this issue: + # https://github.com/googleapis/python-bigquery/issues/2240. assert routine._properties["externalRuntimeOptions"]["maxBatchingRows"] == "100" assert routine._properties["externalRuntimeOptions"]["containerCpu"] == 2 assert routine._properties["externalRuntimeOptions"]["containerMemory"] == "2Gi" @@ -602,7 +605,7 @@ def multiply_five(x: int) -> int: ) -def test_managed_function_resources_errors(session, dataset_id): +def test_managed_function_options_errors(session, dataset_id): def foo(x: int) -> int: return 0