diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index 1dad128599..86ba16e347 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -331,7 +331,7 @@ def _( op: agg_ops.RankOp, column: ibis_types.Column, window=None ) -> ibis_types.IntegerValue: # Ibis produces 0-based ranks, while pandas creates 1-based ranks - return _apply_window_if_present(column.rank(), window) + 1 + return _apply_window_if_present(ibis.rank(), window) + 1 @compile_unary_agg.register diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 969437939f..7245689aae 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -1099,17 +1099,14 @@ def _to_ibis_expr( if not columns: return ibis.memtable([]) + # Make sure we don't have any unbound (deferred) columns. + table = self._table.select(columns) + # Make sure all dtypes are the "canonical" ones for BigFrames. This is # important for operations like UNION where the schema must match. - table = self._table.select( - bigframes.dtypes.ibis_value_to_canonical_type( - column.resolve(self._table) - # TODO(https://github.com/ibis-project/ibis/issues/7613): use - # public API to refer to Deferred type. - if isinstance(column, ibis.common.deferred.Deferred) - else column - ) - for column in columns + table = table.select( + bigframes.dtypes.ibis_value_to_canonical_type(table[column]) + for column in table.columns ) base_table = table if self._reduced_predicate is not None: diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 6e3bc25c47..8a2055ef7f 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -23,7 +23,6 @@ import geopandas as gpd # type: ignore import google.cloud.bigquery as bigquery import ibis -from ibis.backends.bigquery.datatypes import BigQueryType import ibis.expr.datatypes as ibis_dtypes from ibis.expr.datatypes.core import dtype as python_type_to_bigquery_type import ibis.expr.types as ibis_types @@ -33,6 +32,7 @@ import bigframes.constants as constants import third_party.bigframes_vendored.google_cloud_bigquery._pandas_helpers as gcb3p_pandas_helpers +import third_party.bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes import third_party.bigframes_vendored.ibis.expr.operations as vendored_ibis_ops # Type hints for Pandas dtypes supported by BigQuery DataFrame @@ -643,4 +643,4 @@ def ibis_type_from_python_type(t: type) -> ibis_dtypes.DataType: def ibis_type_from_type_kind(tk: bigquery.StandardSqlTypeNames) -> ibis_dtypes.DataType: if tk not in SUPPORTED_IO_BIGQUERY_TYPEKINDS: raise UnsupportedTypeError(tk, SUPPORTED_IO_BIGQUERY_TYPEKINDS) - return BigQueryType.to_ibis(tk) + return third_party_ibis_bqtypes.BigQueryType.to_ibis(tk) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index c7bb5d92c6..af4c4b138a 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -14,7 +14,6 @@ from __future__ import annotations -import functools import hashlib import inspect import logging @@ -28,6 +27,7 @@ import textwrap from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING +import ibis import requests if TYPE_CHECKING: @@ -43,15 +43,12 @@ resourcemanager_v3, ) import google.iam.v1 -from ibis.backends.bigquery.compiler import compiles -from ibis.backends.bigquery.datatypes import BigQueryType from ibis.expr.datatypes.core import DataType as IbisDataType -import ibis.expr.operations as ops -import ibis.expr.rules as rlz from bigframes import clients import bigframes.constants as constants import bigframes.dtypes +import third_party.bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes logger = logging.getLogger(__name__) @@ -173,12 +170,14 @@ def create_bq_remote_function( # Create BQ function # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2 bq_function_args = [] - bq_function_return_type = BigQueryType.from_ibis(output_type) + bq_function_return_type = third_party_ibis_bqtypes.BigQueryType.from_ibis( + output_type + ) # We are expecting the input type annotations to be 1:1 with the input args for idx, name in enumerate(input_args): bq_function_args.append( - f"{name} {BigQueryType.from_ibis(input_types[idx])}" + f"{name} {third_party_ibis_bqtypes.BigQueryType.from_ibis(input_types[idx])}" ) create_function_ddl = f""" CREATE OR REPLACE FUNCTION `{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}({','.join(bq_function_args)}) @@ -515,33 +514,10 @@ def get_remote_function_specs(self, remote_function_name): return (http_endpoint, bq_connection) -def remote_function_node( - routine_ref: bigquery.RoutineReference, ibis_signature: IbisSignature -): - """Creates an Ibis node representing a remote function call.""" - - fields = { - name: rlz.ValueOf(None if type_ == "ANY TYPE" else type_) - for name, type_ in zip( - ibis_signature.parameter_names, ibis_signature.input_types - ) - } - - fields["dtype"] = ibis_signature.output_type # type: ignore - fields["shape"] = rlz.shape_like("args") - - node = type(routine_ref_to_string_for_query(routine_ref), (ops.ValueOp,), fields) # type: ignore - - @compiles(node) - def compile_node(t, op): - return "{}({})".format(node.__name__, ", ".join(map(t.translate, op.args))) - - def f(*args, **kwargs): - return node(*args, **kwargs).to_expr() - - f.bigframes_remote_function = str(routine_ref) # type: ignore - - return f +class UnsupportedTypeError(ValueError): + def __init__(self, type_, supported_types): + self.type = type_ + self.supported_types = supported_types def ibis_signature_from_python_signature( @@ -831,14 +807,16 @@ def wrapper(f): packages, ) - node = remote_function_node(dataset_ref.routine(rf_name), ibis_signature) - - node = functools.wraps(f)(node) - node.__signature__ = signature + node = ibis.udf.scalar.builtin( + f, + name=rf_name, + schema=f"{dataset_ref.project}.{dataset_ref.dataset_id}", + signature=(ibis_signature.input_types, ibis_signature.output_type), + ) node.bigframes_cloud_function = ( remote_function_client.get_cloud_function_fully_qualified_name(cf_name) ) - + node.bigframes_remote_function = str(dataset_ref.routine(rf_name)) # type: ignore return node return wrapper @@ -888,4 +866,17 @@ def read_gbq_function( f"{constants.FEEDBACK_LINK}" ) - return remote_function_node(routine_ref, ibis_signature) + # The name "args" conflicts with the Ibis operator, so we use + # non-standard names for the arguments here. + def node(*ignored_args, **ignored_kwargs): + f"""Remote function {str(routine_ref)}.""" + + node.__name__ = routine_ref.routine_id + node = ibis.udf.scalar.builtin( + node, + name=routine_ref.routine_id, + schema=f"{routine_ref.project}.{routine_ref.dataset_id}", + signature=(ibis_signature.input_types, ibis_signature.output_type), + ) + node.bigframes_remote_function = str(routine_ref) # type: ignore + return node diff --git a/noxfile.py b/noxfile.py index 259943aaa4..91d26cf695 100644 --- a/noxfile.py +++ b/noxfile.py @@ -565,12 +565,12 @@ def prerelease(session: nox.sessions.Session, tests_path): # session.install( # "--upgrade", # "-e", # Use -e so that py.typed file is included. - # "git+https://github.com/ibis-project/ibis.git@7.x.x#egg=ibis-framework", + # "git+https://github.com/ibis-project/ibis.git#egg=ibis-framework", # ) session.install( "--upgrade", - # "--pre", - "ibis-framework>=7.1.0,<7.2.0dev", + "--pre", + "ibis-framework>=8.0.0,<9.0.0dev", ) already_installed.add("ibis-framework") diff --git a/setup.py b/setup.py index 4aa07904f7..516d5b8a19 100644 --- a/setup.py +++ b/setup.py @@ -44,8 +44,7 @@ "google-cloud-iam >=2.12.1", "google-cloud-resource-manager >=1.10.3", "google-cloud-storage >=2.0.0", - # TODO: Relax upper bound once we have fixed unit tests with 7.2.0. - "ibis-framework[bigquery] >=7.1.0,<7.2.0dev", + "ibis-framework[bigquery] >=8.0.0,<9.0.0dev", # TODO: Relax upper bound once we have fixed `system_prerelease` tests. "pandas >=1.5.0,<2.1.4", "pydata-google-auth >=1.8.2", @@ -55,7 +54,7 @@ # Keep sqlglot versions in sync with ibis-framework. This avoids problems # where the incorrect version of sqlglot is installed, such as # https://github.com/googleapis/python-bigquery-dataframes/issues/315 - "sqlglot >=19.9.0,<20", + "sqlglot >=20.8.0,<=20.11", "tabulate >= 0.9", "ipywidgets >=7.7.1", "humanize >= 4.6.0", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index 42cc68eb04..c4fed64fbd 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -10,13 +10,13 @@ google-cloud-bigquery-connection==1.12.0 google-cloud-iam==2.12.1 google-cloud-resource-manager==1.10.3 google-cloud-storage==2.0.0 -ibis-framework==7.1.0 +ibis-framework==8.0.0 pandas==1.5.0 pydata-google-auth==1.8.2 requests==2.27.1 scikit-learn==1.2.2 sqlalchemy==1.4 -sqlglot==19.9.0 +sqlglot==20.8.0 tabulate==0.9 ipywidgets==7.7.1 humanize==4.6.0 diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 8f75534fc6..9f4e138b73 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -157,15 +157,13 @@ def test_tail_with_custom_column_labels(scalars_df_index, scalars_pandas_df_inde ], ) def test_df_nlargest(scalars_df_index, scalars_pandas_df_index, keep): - bf_result = scalars_df_index.nlargest( - 3, ["bool_col", "int64_too"], keep=keep - ).to_pandas() + bf_result = scalars_df_index.nlargest(3, ["bool_col", "int64_too"], keep=keep) pd_result = scalars_pandas_df_index.nlargest( 3, ["bool_col", "int64_too"], keep=keep ) pd.testing.assert_frame_equal( - bf_result, + bf_result.to_pandas(), pd_result, ) @@ -179,11 +177,11 @@ def test_df_nlargest(scalars_df_index, scalars_pandas_df_index, keep): ], ) def test_df_nsmallest(scalars_df_index, scalars_pandas_df_index, keep): - bf_result = scalars_df_index.nsmallest(6, ["bool_col"], keep=keep).to_pandas() + bf_result = scalars_df_index.nsmallest(6, ["bool_col"], keep=keep) pd_result = scalars_pandas_df_index.nsmallest(6, ["bool_col"], keep=keep) pd.testing.assert_frame_equal( - bf_result, + bf_result.to_pandas(), pd_result, ) diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index 42cbcbbc9f..5f940fd7a5 100644 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -118,7 +118,7 @@ def test_arrayvalues_to_ibis_expr_with_concat(): total_ordering_columns=["col1"], ) expr = value.concat([value]) - actual = expr._compile_ordered()._to_ibis_expr(ordering_mode="unordered") + actual = expr._compile_unordered()._to_ibis_expr() assert len(actual.columns) == 3 # TODO(ashleyxu, b/299631930): test out the union expression assert actual.columns[0] == "column_0" diff --git a/tests/unit/test_remote_function.py b/tests/unit/test_remote_function.py index 392872a7be..629bc5326a 100644 --- a/tests/unit/test_remote_function.py +++ b/tests/unit/test_remote_function.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ibis.backends.bigquery import datatypes as bq_types from ibis.expr import datatypes as ibis_types import bigframes.dtypes +import third_party.bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes def test_supported_types_correspond(): @@ -24,7 +24,7 @@ def test_supported_types_correspond(): ibis_types.dtype(t) for t in bigframes.dtypes.SUPPORTED_IO_PYTHON_TYPES } ibis_types_from_bigquery = { - bq_types.BigQueryType.to_ibis(tk) + third_party_ibis_bqtypes.BigQueryType.to_ibis(tk) for tk in bigframes.dtypes.SUPPORTED_IO_BIGQUERY_TYPEKINDS } diff --git a/third_party/bigframes_vendored/ibis/backends/bigquery/datatypes.py b/third_party/bigframes_vendored/ibis/backends/bigquery/datatypes.py new file mode 100644 index 0000000000..e7200cbf2a --- /dev/null +++ b/third_party/bigframes_vendored/ibis/backends/bigquery/datatypes.py @@ -0,0 +1,176 @@ +# Contains code from +# https://github.com/ibis-project/ibis/blob/697d325f13bdf2746a50e86204eb8834b1710bd6/ibis/backends/bigquery/datatypes.py + +from __future__ import annotations + +import google.cloud.bigquery as bq +import ibis +import ibis.expr.datatypes as dt +import ibis.expr.schema as sch +from ibis.formats import SchemaMapper, TypeMapper +import sqlglot as sg + +_from_bigquery_types = { + "INT64": dt.Int64, + "INTEGER": dt.Int64, + "FLOAT": dt.Float64, + "FLOAT64": dt.Float64, + "BOOL": dt.Boolean, + "BOOLEAN": dt.Boolean, + "STRING": dt.String, + "DATE": dt.Date, + "TIME": dt.Time, + "BYTES": dt.Binary, + "JSON": dt.JSON, +} + + +class BigQueryType(TypeMapper): + @classmethod + def to_ibis(cls, typ: str, nullable: bool = True) -> dt.DataType: + if typ == "DATETIME": + return dt.Timestamp(timezone=None, nullable=nullable) + elif typ == "TIMESTAMP": + return dt.Timestamp(timezone="UTC", nullable=nullable) + elif typ == "NUMERIC": + return dt.Decimal(38, 9, nullable=nullable) + elif typ == "BIGNUMERIC": + return dt.Decimal(76, 38, nullable=nullable) + elif typ == "GEOGRAPHY": + return dt.GeoSpatial(geotype="geography", srid=4326, nullable=nullable) + else: + try: + return _from_bigquery_types[typ](nullable=nullable) + except KeyError: + raise TypeError(f"Unable to convert BigQuery type to ibis: {typ}") + + @classmethod + def from_ibis(cls, dtype: dt.DataType) -> str: + if dtype.is_floating(): + return "FLOAT64" + elif dtype.is_uint64(): + raise TypeError( + "Conversion from uint64 to BigQuery integer type (int64) is lossy" + ) + elif dtype.is_integer(): + return "INT64" + elif dtype.is_binary(): + return "BYTES" + elif dtype.is_date(): + return "DATE" + elif dtype.is_timestamp(): + if dtype.timezone is None: + return "DATETIME" + elif dtype.timezone == "UTC": + return "TIMESTAMP" + else: + raise TypeError( + "BigQuery does not support timestamps with timezones other than 'UTC'" + ) + elif dtype.is_decimal(): + if (dtype.precision, dtype.scale) == (76, 38): + return "BIGNUMERIC" + if (dtype.precision, dtype.scale) in [(38, 9), (None, None)]: + return "NUMERIC" + raise TypeError( + "BigQuery only supports decimal types with precision of 38 and " + f"scale of 9 (NUMERIC) or precision of 76 and scale of 38 (BIGNUMERIC). " + f"Current precision: {dtype.precision}. Current scale: {dtype.scale}" + ) + elif dtype.is_array(): + return f"ARRAY<{cls.from_ibis(dtype.value_type)}>" + elif dtype.is_struct(): + fields = ( + f"{sg.to_identifier(k).sql('bigquery')} {cls.from_ibis(v)}" + for k, v in dtype.fields.items() + ) + return "STRUCT<{}>".format(", ".join(fields)) + elif dtype.is_json(): + return "JSON" + elif dtype.is_geospatial(): + if (dtype.geotype, dtype.srid) == ("geography", 4326): + return "GEOGRAPHY" + raise TypeError( + "BigQuery geography uses points on WGS84 reference ellipsoid." + f"Current geotype: {dtype.geotype}, Current srid: {dtype.srid}" + ) + elif dtype.is_map(): + raise NotImplementedError("Maps are not supported in BigQuery") + else: + return str(dtype).upper() + + +class BigQuerySchema(SchemaMapper): + @classmethod + def from_ibis(cls, schema: sch.Schema) -> list[bq.SchemaField]: + schema_fields = [] + + for name, typ in ibis.schema(schema).items(): + if typ.is_array(): + value_type = typ.value_type + if value_type.is_array(): + raise TypeError("Nested arrays are not supported in BigQuery") + + is_struct = value_type.is_struct() + + field_type = ( + "RECORD" if is_struct else BigQueryType.from_ibis(typ.value_type) + ) + mode = "REPEATED" + fields = cls.from_ibis(ibis.schema(getattr(value_type, "fields", {}))) + elif typ.is_struct(): + field_type = "RECORD" + mode = "NULLABLE" if typ.nullable else "REQUIRED" + fields = cls.from_ibis(ibis.schema(typ.fields)) + else: + field_type = BigQueryType.from_ibis(typ) + mode = "NULLABLE" if typ.nullable else "REQUIRED" + fields = [] + + schema_fields.append( + bq.SchemaField(name, field_type=field_type, mode=mode, fields=fields) + ) + return schema_fields + + @classmethod + def _dtype_from_bigquery_field(cls, field: bq.SchemaField) -> dt.DataType: + typ = field.field_type + if typ == "RECORD": + assert field.fields, "RECORD fields are empty" + fields = {f.name: cls._dtype_from_bigquery_field(f) for f in field.fields} + dtype = dt.Struct(fields) + else: + dtype = BigQueryType.to_ibis(typ) + + mode = field.mode + if mode == "NULLABLE": + return dtype.copy(nullable=True) + elif mode == "REQUIRED": + return dtype.copy(nullable=False) + elif mode == "REPEATED": + # arrays with NULL elements aren't supported + return dt.Array(dtype.copy(nullable=False)) + else: + raise TypeError(f"Unknown BigQuery field.mode: {mode}") + + @classmethod + def to_ibis(cls, fields: list[bq.SchemaField]) -> sch.Schema: + return sch.Schema({f.name: cls._dtype_from_bigquery_field(f) for f in fields}) + + +# TODO(kszucs): we can eliminate this function by making dt.DataType traversible +# using ibis.common.graph.Node, similarly to how we traverse ops.Node instances: +# node.find(types) +def spread_type(dt: dt.DataType): + """Returns a generator that contains all the types in the given type. + + For complex types like set and array, it returns the types of the elements. + """ + if dt.is_array(): + yield from spread_type(dt.value_type) + elif dt.is_struct(): + for type_ in dt.types: + yield from spread_type(type_) + elif dt.is_map(): + raise NotImplementedError("Maps are not supported in BigQuery") + yield dt