Skip to content

feat: support bytes type in remote_function #761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 7, 2024
15 changes: 11 additions & 4 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2339,12 +2339,19 @@ def _get_rows_as_json_values(self) -> Block:
index_columns_count = len(self.index_columns)

# column references to form the array of values for the row
column_references_csv = sql.csv(
[sql.cast_as_string(col) for col in self.expr.column_ids]
)
column_types = list(self.index.dtypes) + list(self.dtypes)
column_references = []
for type_, col in zip(column_types, self.expr.column_ids):
if isinstance(type_, pd.ArrowDtype) and pa.types.is_binary(
type_.pyarrow_dtype
):
column_references.append(sql.to_json_string(col))
else:
column_references.append(sql.cast_as_string(col))

column_references_csv = sql.csv(column_references)

# types of the columns to serialize for the row
column_types = list(self.index.dtypes) + list(self.dtypes)
column_types_csv = sql.csv(
[sql.simple_literal(str(typ)) for typ in column_types]
)
Expand Down
6 changes: 6 additions & 0 deletions bigframes/core/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ def cast_as_string(column_name: str) -> str:
return f"CAST({identifier(column_name)} AS STRING)"


def to_json_string(column_name: str) -> str:
"""Return a string representing JSON version of a column."""

return f"TO_JSON_STRING({identifier(column_name)})"


def csv(values: Iterable[str]) -> str:
"""Return a string of comma separated values."""
return ", ".join(values)
Expand Down
27 changes: 24 additions & 3 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3313,22 +3313,43 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
# Early check whether the dataframe dtypes are currently supported
# in the remote function
# NOTE: Keep in sync with the value converters used in the gcf code
# generated in generate_cloud_function_main_code in remote_function.py
# generated in remote_function_template.py
remote_function_supported_dtypes = (
bigframes.dtypes.INT_DTYPE,
bigframes.dtypes.FLOAT_DTYPE,
bigframes.dtypes.BOOL_DTYPE,
bigframes.dtypes.BYTES_DTYPE,
bigframes.dtypes.STRING_DTYPE,
)
supported_dtypes_types = tuple(
type(dtype) for dtype in remote_function_supported_dtypes
type(dtype)
for dtype in remote_function_supported_dtypes
if not isinstance(dtype, pandas.ArrowDtype)
)
# Check ArrowDtype separately since multiple BigQuery types map to
# ArrowDtype, including BYTES and TIMESTAMP.
supported_arrow_types = tuple(
dtype.pyarrow_dtype
for dtype in remote_function_supported_dtypes
if isinstance(dtype, pandas.ArrowDtype)
)
supported_dtypes_hints = tuple(
str(dtype) for dtype in remote_function_supported_dtypes
)

for dtype in self.dtypes:
if not isinstance(dtype, supported_dtypes_types):
if (
# Not one of the pandas/numpy types.
not isinstance(dtype, supported_dtypes_types)
# And not one of the arrow types.
and not (
isinstance(dtype, pandas.ArrowDtype)
and any(
dtype.pyarrow_dtype.equals(arrow_type)
for arrow_type in supported_arrow_types
)
)
):
raise NotImplementedError(
f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1."
f" Supported dtypes are {supported_dtypes_hints}."
Expand Down
3 changes: 2 additions & 1 deletion bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,12 @@ def bf_type_from_type_kind(bf_schema) -> Dict[str, Dtype]:
# Input and output types supported by BigQuery DataFrames remote functions.
# TODO(shobs): Extend the support to all types supported by BQ remote functions
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
RF_SUPPORTED_IO_PYTHON_TYPES = {bool, float, int, str}
RF_SUPPORTED_IO_PYTHON_TYPES = {bool, bytes, float, int, str}

RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS = {
"BOOLEAN",
"BOOL",
"BYTES",
"FLOAT",
"FLOAT64",
"INT64",
Expand Down
93 changes: 66 additions & 27 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
NamedTuple,
Optional,
Sequence,
Tuple,
TYPE_CHECKING,
Union,
)
import warnings

import ibis
import pandas
import pyarrow
import requests

if TYPE_CHECKING:
Expand Down Expand Up @@ -182,15 +184,11 @@ def create_bq_remote_function(
# Create BQ function
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2
bq_function_args = []
bq_function_return_type = third_party_ibis_bqtypes.BigQueryType.from_ibis(
output_type
)
bq_function_return_type = output_type

# We are expecting the input type annotations to be 1:1 with the input args
for idx, name in enumerate(input_args):
bq_function_args.append(
f"{name} {third_party_ibis_bqtypes.BigQueryType.from_ibis(input_types[idx])}"
)
for name, type_ in zip(input_args, input_types):
bq_function_args.append(f"{name} {type_}")

remote_function_options = {
"endpoint": endpoint,
Expand Down Expand Up @@ -259,16 +257,31 @@ def get_cloud_function_endpoint(self, name):
return None

def generate_cloud_function_code(
self, def_, directory, package_requirements=None, is_row_processor=False
self,
def_,
directory,
*,
input_types: Tuple[str],
output_type: str,
package_requirements=None,
is_row_processor=False,
):
"""Generate the cloud function code for a given user defined function."""
"""Generate the cloud function code for a given user defined function.

Args:
input_types (tuple[str]):
Types of the input arguments in BigQuery SQL data type names.
output_type (str):
Types of the output scalar as a BigQuery SQL data type name.
"""

# requirements.txt
requirements = ["cloudpickle >= 2.1.0"]
if is_row_processor:
# bigframes remote function will send an entire row of data as json,
# which would be converted to a pandas series and processed
requirements.append(f"pandas=={pandas.__version__}")
requirements.append(f"pyarrow=={pyarrow.__version__}")
if package_requirements:
requirements.extend(package_requirements)
requirements = sorted(requirements)
Expand All @@ -278,26 +291,45 @@ def generate_cloud_function_code(

# main.py
entry_point = bigframes.functions.remote_function_template.generate_cloud_function_main_code(
def_, directory, is_row_processor
def_,
directory,
input_types=input_types,
output_type=output_type,
is_row_processor=is_row_processor,
)
return entry_point

def create_cloud_function(
self,
def_,
cf_name,
*,
input_types: Tuple[str],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do input_types and output_types need default values, when they are defined after "*"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the * means they are keyword-only. Without a default value they are required.

output_type: str,
package_requirements=None,
timeout_seconds=600,
max_instance_count=None,
is_row_processor=False,
vpc_connector=None,
):
"""Create a cloud function from the given user defined function."""
"""Create a cloud function from the given user defined function.

Args:
input_types (tuple[str]):
Types of the input arguments in BigQuery SQL data type names.
output_type (str):
Types of the output scalar as a BigQuery SQL data type name.
"""

# Build and deploy folder structure containing cloud function
with tempfile.TemporaryDirectory() as directory:
entry_point = self.generate_cloud_function_code(
def_, directory, package_requirements, is_row_processor
def_,
directory,
package_requirements=package_requirements,
input_types=input_types,
output_type=output_type,
is_row_processor=is_row_processor,
)
archive_path = shutil.make_archive(directory, "zip", directory)

Expand Down Expand Up @@ -444,11 +476,13 @@ def provision_bq_remote_function(
cf_endpoint = self.create_cloud_function(
def_,
cloud_function_name,
package_requirements,
cloud_function_timeout,
cloud_function_max_instance_count,
is_row_processor,
cloud_function_vpc_connector,
input_types=input_types,
output_type=output_type,
package_requirements=package_requirements,
timeout_seconds=cloud_function_timeout,
max_instance_count=cloud_function_max_instance_count,
is_row_processor=is_row_processor,
vpc_connector=cloud_function_vpc_connector,
)
else:
logger.info(f"Cloud function {cloud_function_name} already exists.")
Expand Down Expand Up @@ -957,16 +991,21 @@ def try_delattr(attr):

rf_name, cf_name = remote_function_client.provision_bq_remote_function(
func,
ibis_signature.input_types,
ibis_signature.output_type,
reuse,
name,
packages,
max_batching_rows,
cloud_function_timeout,
cloud_function_max_instances,
is_row_processor,
cloud_function_vpc_connector,
input_types=tuple(
third_party_ibis_bqtypes.BigQueryType.from_ibis(type_)
for type_ in ibis_signature.input_types
),
output_type=third_party_ibis_bqtypes.BigQueryType.from_ibis(
ibis_signature.output_type
),
reuse=reuse,
name=name,
package_requirements=packages,
max_batching_rows=max_batching_rows,
cloud_function_timeout=cloud_function_timeout,
cloud_function_max_instance_count=cloud_function_max_instances,
is_row_processor=is_row_processor,
cloud_function_vpc_connector=cloud_function_vpc_connector,
)

# TODO: Move ibis logic to compiler step
Expand Down
Loading