diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 1833ac489c..2c9dd0cb31 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -202,6 +202,9 @@ def provision_bq_managed_function( output_type: str, name: Optional[str], packages: Optional[Sequence[str]], + max_batching_rows: Optional[int], + container_cpu: Optional[float], + container_memory: Optional[str], is_row_processor: bool, bq_connection_id, *, @@ -234,6 +237,12 @@ def provision_bq_managed_function( "runtime_version": _MANAGED_FUNC_PYTHON_VERSION, "entry_point": "bigframes_handler", } + if max_batching_rows: + managed_function_options["max_batching_rows"] = max_batching_rows + if container_cpu: + managed_function_options["container_cpu"] = container_cpu + if container_memory: + managed_function_options["container_memory"] = container_memory # Augment user package requirements with any internal package # requirements. diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index a7910127e4..22e6981c38 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -702,6 +702,9 @@ def udf( bigquery_connection: Optional[str] = None, name: Optional[str] = None, packages: Optional[Sequence[str]] = None, + max_batching_rows: Optional[int] = None, + container_cpu: Optional[float] = None, + container_memory: Optional[str] = None, ): """Decorator to turn a Python user defined function (udf) into a BigQuery managed function. @@ -769,6 +772,21 @@ def udf( dependency is added to the `requirements.txt` as is, and can be of the form supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. + max_batching_rows (int, Optional): + The maximum number of rows in each batch. If you specify + max_batching_rows, BigQuery determines the number of rows in a + batch, up to the max_batching_rows limit. If max_batching_rows + is not specified, the number of rows to batch is determined + automatically. + container_cpu (float, Optional): + The CPU limits for containers that run Python UDFs. By default, + the CPU allocated is 0.33 vCPU. See details at + https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits. + container_memory (str, Optional): + The memory limits for containers that run Python UDFs. By + default, the memory allocated to each container instance is + 512 MiB. See details at + https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits. """ warnings.warn("udf is in preview.", category=bfe.PreviewWarning, stacklevel=5) @@ -854,6 +872,9 @@ def wrapper(func): output_type=udf_sig.sql_output_type, name=name, packages=packages, + max_batching_rows=max_batching_rows, + container_cpu=container_cpu, + container_memory=container_memory, is_row_processor=is_row_processor, bq_connection_id=bq_connection_id, ) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index f163d25757..76e0f8719b 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -142,6 +142,9 @@ def udf( bigquery_connection: Optional[str] = None, name: str, packages: Optional[Sequence[str]] = None, + max_batching_rows: Optional[int] = None, + container_cpu: Optional[float] = None, + container_memory: Optional[str] = None, ): return global_session.with_default_session( bigframes.session.Session.udf, @@ -151,6 +154,9 @@ def udf( bigquery_connection=bigquery_connection, name=name, packages=packages, + max_batching_rows=max_batching_rows, + container_cpu=container_cpu, + container_memory=container_memory, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 2c9dea2d19..d27cd48cdd 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1686,6 +1686,9 @@ def udf( bigquery_connection: Optional[str] = None, name: str, packages: Optional[Sequence[str]] = None, + max_batching_rows: Optional[int] = None, + container_cpu: Optional[float] = None, + container_memory: Optional[str] = None, ): """Decorator to turn a Python user defined function (udf) into a [BigQuery managed user-defined function](https://cloud.google.com/bigquery/docs/user-defined-functions-python). @@ -1807,6 +1810,21 @@ def udf( dependency is added to the `requirements.txt` as is, and can be of the form supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. + max_batching_rows (int, Optional): + The maximum number of rows in each batch. If you specify + max_batching_rows, BigQuery determines the number of rows in a + batch, up to the max_batching_rows limit. If max_batching_rows + is not specified, the number of rows to batch is determined + automatically. + container_cpu (float, Optional): + The CPU limits for containers that run Python UDFs. By default, + the CPU allocated is 0.33 vCPU. See details at + https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits. + container_memory (str, Optional): + The memory limits for containers that run Python UDFs. By + default, the memory allocated to each container instance is + 512 MiB. See details at + https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits. Returns: collections.abc.Callable: A managed function object pointing to the cloud assets created @@ -1828,6 +1846,9 @@ def udf( bigquery_connection=bigquery_connection, name=name, packages=packages, + max_batching_rows=max_batching_rows, + container_cpu=container_cpu, + container_memory=container_memory, ) def read_gbq_function( diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index ad5849eb2f..c58610d1ff 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -549,3 +549,101 @@ def foo(x: int) -> int: finally: # Clean up the gcp assets created for the managed function. cleanup_function_assets(foo, session.bqclient, ignore_failures=False) + + +def test_managed_function_options(session, dataset_id, scalars_dfs): + try: + + def multiply_five(x: int) -> int: + return x * 5 + + mf_multiply_five = session.udf( + dataset=dataset_id, + name=prefixer.create_prefix(), + max_batching_rows=100, + container_cpu=2, + container_memory="2Gi", + )(multiply_five) + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_int64_df = scalars_df["int64_col"] + bf_int64_df_filtered = bf_int64_df.dropna() + bf_result = bf_int64_df_filtered.apply(mf_multiply_five).to_pandas() + + pd_int64_df = scalars_pandas_df["int64_col"] + pd_int64_df_filtered = pd_int64_df.dropna() + pd_result = pd_int64_df_filtered.apply(multiply_five) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + # Make sure the read_gbq_function path works for this function. + multiply_five_ref = session.read_gbq_function( + function_name=mf_multiply_five.bigframes_bigquery_function, # type: ignore + ) + assert mf_multiply_five.bigframes_bigquery_function == multiply_five_ref.bigframes_bigquery_function # type: ignore + + bf_result = bf_int64_df_filtered.apply(multiply_five_ref).to_pandas() + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + # Retrieve the routine and validate its runtime configuration. + routine = session.bqclient.get_routine( + mf_multiply_five.bigframes_bigquery_function + ) + + # TODO(jialuo): Use the newly exposed class properties instead of + # accessing the hidden _properties after resolve of this issue: + # https://github.com/googleapis/python-bigquery/issues/2240. + assert routine._properties["externalRuntimeOptions"]["maxBatchingRows"] == "100" + assert routine._properties["externalRuntimeOptions"]["containerCpu"] == 2 + assert routine._properties["externalRuntimeOptions"]["containerMemory"] == "2Gi" + + finally: + # Clean up the gcp assets created for the managed function. + cleanup_function_assets( + mf_multiply_five, session.bqclient, ignore_failures=False + ) + + +def test_managed_function_options_errors(session, dataset_id): + def foo(x: int) -> int: + return 0 + + with pytest.raises( + google.api_core.exceptions.BadRequest, + # For CPU Value >= 1.0, the value must be one of [1, 2, ...]. + match="Invalid container_cpu function OPTIONS value", + ): + session.udf( + dataset=dataset_id, + name=prefixer.create_prefix(), + max_batching_rows=100, + container_cpu=2.5, + container_memory="2Gi", + )(foo) + + with pytest.raises( + google.api_core.exceptions.BadRequest, + # For less than 1.0 CPU, the value must be no less than 0.33. + match="Invalid container_cpu function OPTIONS value", + ): + session.udf( + dataset=dataset_id, + name=prefixer.create_prefix(), + max_batching_rows=100, + container_cpu=0.10, + container_memory="512Mi", + )(foo) + + with pytest.raises( + google.api_core.exceptions.BadRequest, + # For 2.00 CPU, the memory must be in the range of [256Mi, 8Gi]. + match="Invalid container_memory function OPTIONS value", + ): + session.udf( + dataset=dataset_id, + name=prefixer.create_prefix(), + max_batching_rows=100, + container_cpu=2, + container_memory="64Mi", + )(foo)