Skip to content

feat: specific pyarrow mappings for decimal, bytes types #283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 21, 2023
14 changes: 12 additions & 2 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ def _get_ibis_column(self, key: str) -> ibis_types.Value:
raise ValueError(
"Column name {} not in set of values: {}".format(key, self.column_ids)
)
return typing.cast(ibis_types.Value, self._column_names[key])
return typing.cast(
ibis_types.Value,
bigframes.dtypes.ibis_value_to_canonical_type(self._column_names[key]),
)

def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
ibis_type = typing.cast(
Expand Down Expand Up @@ -1177,7 +1180,14 @@ def _to_ibis_expr(
# Make sure all dtypes are the "canonical" ones for BigFrames. This is
# important for operations like UNION where the schema must match.
table = self._table.select(
bigframes.dtypes.ibis_value_to_canonical_type(column) for column in columns
bigframes.dtypes.ibis_value_to_canonical_type(
column.resolve(self._table)
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
# public API to refer to Deferred type.
if isinstance(column, ibis.common.deferred.Deferred)
else column
)
for column in columns
)
base_table = table
if self._reduced_predicate is not None:
Expand Down
41 changes: 23 additions & 18 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import bigframes.constants as constants
import third_party.bigframes_vendored.google_cloud_bigquery._pandas_helpers as gcb3p_pandas_helpers
import third_party.bigframes_vendored.ibis.expr.operations as vendored_ibis_ops

# Type hints for Pandas dtypes supported by BigQuery DataFrame
Dtype = Union[
Expand Down Expand Up @@ -97,6 +98,15 @@
ibis_dtypes.Timestamp(timezone="UTC"),
pd.ArrowDtype(pa.timestamp("us", tz="UTC")),
),
(ibis_dtypes.binary, pd.ArrowDtype(pa.binary())),
(
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True),
pd.ArrowDtype(pa.decimal128(38, 9)),
),
(
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True),
pd.ArrowDtype(pa.decimal256(76, 38)),
),
)

BIGFRAMES_TO_IBIS: Dict[Dtype, ibis_dtypes.DataType] = {
Expand All @@ -112,6 +122,13 @@
ibis_dtypes.time: pa.time64("us"),
ibis_dtypes.Timestamp(timezone=None): pa.timestamp("us"),
ibis_dtypes.Timestamp(timezone="UTC"): pa.timestamp("us", tz="UTC"),
ibis_dtypes.binary: pd.ArrowDtype(pa.binary()),
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True): pd.ArrowDtype(
pa.decimal128(38, 9)
),
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True): pd.ArrowDtype(
pa.decimal256(76, 38)
),
}

ARROW_TO_IBIS = {arrow: ibis for ibis, arrow in IBIS_TO_ARROW.items()}
Expand All @@ -125,10 +142,6 @@
)
IBIS_TO_BIGFRAMES.update(
{
ibis_dtypes.binary: np.dtype("O"),
ibis_dtypes.json: np.dtype("O"),
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True): np.dtype("O"),
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True): np.dtype("O"),
ibis_dtypes.GeoSpatial(
geotype="geography", srid=4326, nullable=True
): gpd.array.GeometryDtype(),
Expand Down Expand Up @@ -178,7 +191,7 @@ def ibis_dtype_to_bigframes_dtype(
# our IO returns them as objects. Eventually, we should support them as
# ArrowDType (and update the IO accordingly)
if isinstance(ibis_dtype, ibis_dtypes.Array):
return np.dtype("O")
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))

if isinstance(ibis_dtype, ibis_dtypes.Struct):
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))
Expand Down Expand Up @@ -224,21 +237,13 @@ def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value:
This is useful in cases where multiple types correspond to the same BigFrames dtype.
"""
ibis_type = value.type()
name = value.get_name()
if ibis_type.is_json():
value = vendored_ibis_ops.ToJsonString(value).to_expr()
return value.name(name)
# Allow REQUIRED fields to be joined with NULLABLE fields.
nullable_type = ibis_type.copy(nullable=True)
return value.cast(nullable_type).name(value.get_name())


def ibis_table_to_canonical_types(table: ibis_types.Table) -> ibis_types.Table:
"""Converts an Ibis table expression to canonical types.

This is useful in cases where multiple types correspond to the same BigFrames dtype.
"""
casted_columns = []
for column_name in table.columns:
column = typing.cast(ibis_types.Value, table[column_name])
casted_columns.append(ibis_value_to_canonical_type(column))
return table.select(*casted_columns)
return value.cast(nullable_type).name(name)


def arrow_dtype_to_ibis_dtype(arrow_dtype: pa.DataType) -> ibis_dtypes.DataType:
Expand Down
3 changes: 2 additions & 1 deletion tests/system/small/test_dataframe_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def test_load_json(session):
expected = pd.DataFrame(
{
"json_column": ['{"bar":true,"foo":10}'],
}
},
dtype=pd.StringDtype(storage="pyarrow"),
)
expected.index = expected.index.astype("Int64")
pd.testing.assert_series_equal(result.dtypes, expected.dtypes)
Expand Down
24 changes: 18 additions & 6 deletions tests/system/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,28 @@ def convert_pandas_dtypes(df: pd.DataFrame, bytes_col: bool):
df["geography_col"].replace({np.nan: None})
)

# Convert bytes types column.
if bytes_col:
if not isinstance(df["bytes_col"].dtype, pd.ArrowDtype):
df["bytes_col"] = df["bytes_col"].apply(
lambda value: base64.b64decode(value) if not pd.isnull(value) else value
)
arrow_table = pa.Table.from_pandas(
pd.DataFrame(df, columns=["bytes_col"]),
schema=pa.schema([("bytes_col", pa.binary())]),
)
df["bytes_col"] = arrow_table.to_pandas(types_mapper=pd.ArrowDtype)["bytes_col"]

# Convert numeric types column.
df["numeric_col"] = df["numeric_col"].apply(
lambda value: decimal.Decimal(str(value)) if value else None # type: ignore
)
if not isinstance(df["numeric_col"].dtype, pd.ArrowDtype):
# Convert numeric types column.
df["numeric_col"] = df["numeric_col"].apply(
lambda value: decimal.Decimal(str(value)) if value else None # type: ignore
)
arrow_table = pa.Table.from_pandas(
pd.DataFrame(df, columns=["numeric_col"]),
schema=pa.schema([("numeric_col", pa.decimal128(38, 9))]),
)
df["numeric_col"] = arrow_table.to_pandas(types_mapper=pd.ArrowDtype)[
"numeric_col"
]


def assert_pandas_df_equal_pca_components(actual, expected, **kwargs):
Expand Down
7 changes: 3 additions & 4 deletions tests/unit/test_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
pytest.param(
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True),
np.dtype("O"),
pd.ArrowDtype(pa.decimal256(76, 38)),
id="bignumeric",
),
pytest.param(ibis_dtypes.boolean, pd.BooleanDtype(), id="bool"),
pytest.param(ibis_dtypes.binary, np.dtype("O"), id="bytes"),
pytest.param(ibis_dtypes.binary, pd.ArrowDtype(pa.binary()), id="bytes"),
pytest.param(ibis_dtypes.date, pd.ArrowDtype(pa.date32()), id="date"),
pytest.param(
ibis_dtypes.Timestamp(), pd.ArrowDtype(pa.timestamp("us")), id="datetime"
Expand All @@ -49,10 +49,9 @@
pytest.param(ibis_dtypes.int8, pd.Int64Dtype(), id="int8-as-int64"),
pytest.param(ibis_dtypes.int64, pd.Int64Dtype(), id="int64"),
# TODO(tswast): custom dtype (or at least string dtype) for JSON objects
pytest.param(ibis_dtypes.json, np.dtype("O"), id="json"),
pytest.param(
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True),
np.dtype("O"),
pd.ArrowDtype(pa.decimal128(38, 9)),
id="numeric",
),
pytest.param(
Expand Down