Skip to content

Commit 36578ab

Browse files
authored
feat: support gcf max instance count in remote_function (#657)
* feat: support gcf max instance count in `remote_function` * fix comment in test * enable back the retry annotation
1 parent c8d4e23 commit 36578ab

File tree

4 files changed

+79
-5
lines changed

4 files changed

+79
-5
lines changed

bigframes/functions/remote_function.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,12 @@ def generate_cloud_function_code(self, def_, dir, package_requirements=None):
342342
return entry_point
343343

344344
def create_cloud_function(
345-
self, def_, cf_name, package_requirements=None, cloud_function_timeout=600
345+
self,
346+
def_,
347+
cf_name,
348+
package_requirements=None,
349+
timeout_seconds=600,
350+
max_instance_count=None,
346351
):
347352
"""Create a cloud function from the given user defined function."""
348353

@@ -411,14 +416,16 @@ def create_cloud_function(
411416
)
412417
function.service_config = functions_v2.ServiceConfig()
413418
function.service_config.available_memory = "1024M"
414-
if cloud_function_timeout is not None:
415-
if cloud_function_timeout > 1200:
419+
if timeout_seconds is not None:
420+
if timeout_seconds > 1200:
416421
raise ValueError(
417422
"BigQuery remote function can wait only up to 20 minutes"
418423
", see for more details "
419424
"https://cloud.google.com/bigquery/quotas#remote_function_limits."
420425
)
421-
function.service_config.timeout_seconds = cloud_function_timeout
426+
function.service_config.timeout_seconds = timeout_seconds
427+
if max_instance_count is not None:
428+
function.service_config.max_instance_count = max_instance_count
422429
function.service_config.service_account_email = (
423430
self._cloud_function_service_account
424431
)
@@ -466,6 +473,7 @@ def provision_bq_remote_function(
466473
package_requirements,
467474
max_batching_rows,
468475
cloud_function_timeout,
476+
cloud_function_max_instance_count,
469477
):
470478
"""Provision a BigQuery remote function."""
471479
# If reuse of any existing function with the same name (indicated by the
@@ -487,7 +495,11 @@ def provision_bq_remote_function(
487495
# Create the cloud function if it does not exist
488496
if not cf_endpoint:
489497
cf_endpoint = self.create_cloud_function(
490-
def_, cloud_function_name, package_requirements, cloud_function_timeout
498+
def_,
499+
cloud_function_name,
500+
package_requirements,
501+
cloud_function_timeout,
502+
cloud_function_max_instance_count,
491503
)
492504
else:
493505
logger.info(f"Cloud function {cloud_function_name} already exists.")
@@ -642,6 +654,7 @@ def remote_function(
642654
cloud_function_docker_repository: Optional[str] = None,
643655
max_batching_rows: Optional[int] = 1000,
644656
cloud_function_timeout: Optional[int] = 600,
657+
cloud_function_max_instances: Optional[int] = None,
645658
):
646659
"""Decorator to turn a user defined function into a BigQuery remote function.
647660
@@ -778,6 +791,14 @@ def remote_function(
778791
https://cloud.google.com/bigquery/quotas#remote_function_limits.
779792
By default BigQuery DataFrames uses a 10 minute timeout. `None`
780793
can be passed to let the cloud functions default timeout take effect.
794+
cloud_function_max_instances (int, Optional):
795+
The maximumm instance count for the cloud function created. This
796+
can be used to control how many cloud function instances can be
797+
active at max at any given point of time. Lower setting can help
798+
control the spike in the billing. Higher setting can help
799+
support processing larger scale data. When not specified, cloud
800+
function's default setting applies. For more details see
801+
https://cloud.google.com/functions/docs/configuring/max-instances
781802
"""
782803
if isinstance(input_types, type):
783804
input_types = [input_types]
@@ -906,6 +927,7 @@ def wrapper(f):
906927
packages,
907928
max_batching_rows,
908929
cloud_function_timeout,
930+
cloud_function_max_instances,
909931
)
910932

911933
# TODO: Move ibis logic to compiler step

bigframes/pandas/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,7 @@ def remote_function(
652652
cloud_function_docker_repository: Optional[str] = None,
653653
max_batching_rows: Optional[int] = 1000,
654654
cloud_function_timeout: Optional[int] = 600,
655+
cloud_function_max_instances: Optional[int] = None,
655656
):
656657
return global_session.with_default_session(
657658
bigframes.session.Session.remote_function,
@@ -667,6 +668,7 @@ def remote_function(
667668
cloud_function_docker_repository=cloud_function_docker_repository,
668669
max_batching_rows=max_batching_rows,
669670
cloud_function_timeout=cloud_function_timeout,
671+
cloud_function_max_instances=cloud_function_max_instances,
670672
)
671673

672674

bigframes/session/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,6 +1466,7 @@ def remote_function(
14661466
cloud_function_docker_repository: Optional[str] = None,
14671467
max_batching_rows: Optional[int] = 1000,
14681468
cloud_function_timeout: Optional[int] = 600,
1469+
cloud_function_max_instances: Optional[int] = None,
14691470
):
14701471
"""Decorator to turn a user defined function into a BigQuery remote function. Check out
14711472
the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes.
@@ -1580,6 +1581,14 @@ def remote_function(
15801581
https://cloud.google.com/bigquery/quotas#remote_function_limits.
15811582
By default BigQuery DataFrames uses a 10 minute timeout. `None`
15821583
can be passed to let the cloud functions default timeout take effect.
1584+
cloud_function_max_instances (int, Optional):
1585+
The maximumm instance count for the cloud function created. This
1586+
can be used to control how many cloud function instances can be
1587+
active at max at any given point of time. Lower setting can help
1588+
control the spike in the billing. Higher setting can help
1589+
support processing larger scale data. When not specified, cloud
1590+
function's default setting applies. For more details see
1591+
https://cloud.google.com/functions/docs/configuring/max-instances
15831592
Returns:
15841593
callable: A remote function object pointing to the cloud assets created
15851594
in the background to support the remote execution. The cloud assets can be
@@ -1603,6 +1612,7 @@ def remote_function(
16031612
cloud_function_docker_repository=cloud_function_docker_repository,
16041613
max_batching_rows=max_batching_rows,
16051614
cloud_function_timeout=cloud_function_timeout,
1615+
cloud_function_max_instances=cloud_function_max_instances,
16061616
)
16071617

16081618
def read_gbq_function(

tests/system/large/test_remote_function.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,3 +1414,43 @@ def test_remote_function_gcf_timeout_max_supported_exceeded(session):
14141414
@session.remote_function([int], int, reuse=False, cloud_function_timeout=1201)
14151415
def square(x):
14161416
return x * x
1417+
1418+
1419+
@pytest.mark.parametrize(
1420+
("max_instances_args", "expected_max_instances"),
1421+
[
1422+
pytest.param({}, 100, id="no-set"),
1423+
pytest.param({"cloud_function_max_instances": None}, 100, id="set-None"),
1424+
pytest.param({"cloud_function_max_instances": 1000}, 1000, id="set-explicit"),
1425+
],
1426+
)
1427+
@pytest.mark.flaky(retries=2, delay=120)
1428+
def test_remote_function_max_instances(
1429+
session, scalars_dfs, max_instances_args, expected_max_instances
1430+
):
1431+
try:
1432+
1433+
def square(x):
1434+
return x * x
1435+
1436+
square_remote = session.remote_function(
1437+
[int], int, reuse=False, **max_instances_args
1438+
)(square)
1439+
1440+
# Assert that the GCF is created with the intended max instance count
1441+
gcf = session.cloudfunctionsclient.get_function(
1442+
name=square_remote.bigframes_cloud_function
1443+
)
1444+
assert gcf.service_config.max_instance_count == expected_max_instances
1445+
1446+
scalars_df, scalars_pandas_df = scalars_dfs
1447+
1448+
bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas()
1449+
pd_result = scalars_pandas_df["int64_too"].apply(square)
1450+
1451+
pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)
1452+
finally:
1453+
# clean up the gcp assets created for the remote function
1454+
cleanup_remote_function_assets(
1455+
session.bqclient, session.cloudfunctionsclient, square_remote
1456+
)

0 commit comments

Comments
 (0)