diff --git a/doc/source/user_guide/io.rst b/doc/source/user_guide/io.rst index f6315ea894e62..fcf68522bd338 100644 --- a/doc/source/user_guide/io.rst +++ b/doc/source/user_guide/io.rst @@ -4648,10 +4648,10 @@ Several caveats. * Index level names, if specified, must be strings. * In the ``pyarrow`` engine, categorical dtypes for non-string types can be serialized to parquet, but will de-serialize as their primitive dtype. * The ``pyarrow`` engine preserves the ``ordered`` flag of categorical dtypes with string types. ``fastparquet`` does not preserve the ``ordered`` flag. -* Non supported types include ``Period`` and actual Python object types. These will raise a helpful error message - on an attempt at serialization. +* Non supported types include ``Interval`` and actual Python object types. These will raise a helpful error message + on an attempt at serialization. ``Period`` type is supported with pyarrow >= 0.16.0. * The ``pyarrow`` engine preserves extension data types such as the nullable integer and string data - type (requiring pyarrow >= 1.0.0, and requiring the extension type to implement the needed protocols, + type (requiring pyarrow >= 0.16.0, and requiring the extension type to implement the needed protocols, see the :ref:`extension types documentation `). You can specify an ``engine`` to direct the serialization. This can be one of ``pyarrow``, or ``fastparquet``, or ``auto``. diff --git a/doc/source/whatsnew/v1.0.0.rst b/doc/source/whatsnew/v1.0.0.rst index 9d86921bc3be4..8024284acc33d 100755 --- a/doc/source/whatsnew/v1.0.0.rst +++ b/doc/source/whatsnew/v1.0.0.rst @@ -204,9 +204,9 @@ Other enhancements - Added ``encoding`` argument to :func:`DataFrame.to_html` for non-ascii text (:issue:`28663`) - :meth:`Styler.background_gradient` now accepts ``vmin`` and ``vmax`` arguments (:issue:`12145`) - :meth:`Styler.format` added the ``na_rep`` parameter to help format the missing values (:issue:`21527`, :issue:`28358`) -- Roundtripping DataFrames with nullable integer or string data types to parquet +- Roundtripping DataFrames with nullable integer, string and period data types to parquet (:meth:`~DataFrame.to_parquet` / :func:`read_parquet`) using the `'pyarrow'` engine - now preserve those data types with pyarrow >= 1.0.0 (:issue:`20612`). + now preserve those data types with pyarrow >= 0.16.0 (:issue:`20612`, :issue:`28371`). - The ``partition_cols`` argument in :meth:`DataFrame.to_parquet` now accepts a string (:issue:`27117`) - :func:`pandas.read_json` now parses ``NaN``, ``Infinity`` and ``-Infinity`` (:issue:`12213`) - The ``pandas.np`` submodule is now deprecated. Import numpy directly instead (:issue:`30296`) @@ -220,7 +220,6 @@ Other enhancements - The ``pandas.datetime`` class is now deprecated. Import from ``datetime`` instead (:issue:`30296`) - :meth:`Timestamp.fromisocalendar` is now compatible with python 3.8 and above (:issue:`28115`) - Build Changes ^^^^^^^^^^^^^ diff --git a/pandas/core/arrays/_arrow_utils.py b/pandas/core/arrays/_arrow_utils.py new file mode 100644 index 0000000000000..e0d33bebeb421 --- /dev/null +++ b/pandas/core/arrays/_arrow_utils.py @@ -0,0 +1,124 @@ +from distutils.version import LooseVersion +import json + +import numpy as np +import pyarrow + +from pandas.core.arrays.interval import _VALID_CLOSED + +_pyarrow_version_ge_015 = LooseVersion(pyarrow.__version__) >= LooseVersion("0.15") + + +def pyarrow_array_to_numpy_and_mask(arr, dtype): + """ + Convert a primitive pyarrow.Array to a numpy array and boolean mask based + on the buffers of the Array. + + Parameters + ---------- + arr : pyarrow.Array + dtype : numpy.dtype + + Returns + ------- + (data, mask) + Tuple of two numpy arrays with the raw data (with specified dtype) and + a boolean mask (validity mask, so False means missing) + """ + buflist = arr.buffers() + data = np.frombuffer(buflist[1], dtype=dtype)[arr.offset : arr.offset + len(arr)] + bitmask = buflist[0] + if bitmask is not None: + mask = pyarrow.BooleanArray.from_buffers( + pyarrow.bool_(), len(arr), [None, bitmask] + ) + mask = np.asarray(mask) + else: + mask = np.ones(len(arr), dtype=bool) + return data, mask + + +if _pyarrow_version_ge_015: + # the pyarrow extension types are only available for pyarrow 0.15+ + + class ArrowPeriodType(pyarrow.ExtensionType): + def __init__(self, freq): + # attributes need to be set first before calling + # super init (as that calls serialize) + self._freq = freq + pyarrow.ExtensionType.__init__(self, pyarrow.int64(), "pandas.period") + + @property + def freq(self): + return self._freq + + def __arrow_ext_serialize__(self): + metadata = {"freq": self.freq} + return json.dumps(metadata).encode() + + @classmethod + def __arrow_ext_deserialize__(cls, storage_type, serialized): + metadata = json.loads(serialized.decode()) + return ArrowPeriodType(metadata["freq"]) + + def __eq__(self, other): + if isinstance(other, pyarrow.BaseExtensionType): + return type(self) == type(other) and self.freq == other.freq + else: + return NotImplemented + + def __hash__(self): + return hash((str(self), self.freq)) + + # register the type with a dummy instance + _period_type = ArrowPeriodType("D") + pyarrow.register_extension_type(_period_type) + + class ArrowIntervalType(pyarrow.ExtensionType): + def __init__(self, subtype, closed): + # attributes need to be set first before calling + # super init (as that calls serialize) + assert closed in _VALID_CLOSED + self._closed = closed + if not isinstance(subtype, pyarrow.DataType): + subtype = pyarrow.type_for_alias(str(subtype)) + self._subtype = subtype + + storage_type = pyarrow.struct([("left", subtype), ("right", subtype)]) + pyarrow.ExtensionType.__init__(self, storage_type, "pandas.interval") + + @property + def subtype(self): + return self._subtype + + @property + def closed(self): + return self._closed + + def __arrow_ext_serialize__(self): + metadata = {"subtype": str(self.subtype), "closed": self.closed} + return json.dumps(metadata).encode() + + @classmethod + def __arrow_ext_deserialize__(cls, storage_type, serialized): + metadata = json.loads(serialized.decode()) + subtype = pyarrow.type_for_alias(metadata["subtype"]) + closed = metadata["closed"] + return ArrowIntervalType(subtype, closed) + + def __eq__(self, other): + if isinstance(other, pyarrow.BaseExtensionType): + return ( + type(self) == type(other) + and self.subtype == other.subtype + and self.closed == other.closed + ) + else: + return NotImplemented + + def __hash__(self): + return hash((str(self), str(self.subtype), self.closed)) + + # register the type with a dummy instance + _interval_type = ArrowIntervalType(pyarrow.int64(), "left") + pyarrow.register_extension_type(_interval_type) diff --git a/pandas/core/arrays/integer.py b/pandas/core/arrays/integer.py index d63692c5ba972..5b541ee561688 100644 --- a/pandas/core/arrays/integer.py +++ b/pandas/core/arrays/integer.py @@ -94,6 +94,7 @@ def construct_array_type(cls): def __from_arrow__(self, array): """Construct IntegerArray from passed pyarrow Array/ChunkedArray""" import pyarrow + from pandas.core.arrays._arrow_utils import pyarrow_array_to_numpy_and_mask if isinstance(array, pyarrow.Array): chunks = [array] @@ -103,18 +104,7 @@ def __from_arrow__(self, array): results = [] for arr in chunks: - buflist = arr.buffers() - data = np.frombuffer(buflist[1], dtype=self.type)[ - arr.offset : arr.offset + len(arr) - ] - bitmask = buflist[0] - if bitmask is not None: - mask = pyarrow.BooleanArray.from_buffers( - pyarrow.bool_(), len(arr), [None, bitmask] - ) - mask = np.asarray(mask) - else: - mask = np.ones(len(arr), dtype=bool) + data, mask = pyarrow_array_to_numpy_and_mask(arr, dtype=self.type) int_arr = IntegerArray(data.copy(), ~mask, copy=False) results.append(int_arr) diff --git a/pandas/core/arrays/interval.py b/pandas/core/arrays/interval.py index 75dd00104db1b..e6cfca74048e7 100644 --- a/pandas/core/arrays/interval.py +++ b/pandas/core/arrays/interval.py @@ -1081,6 +1081,59 @@ def __array__(self, dtype=None): result[i] = Interval(left[i], right[i], closed) return result + def __arrow_array__(self, type=None): + """ + Convert myself into a pyarrow Array. + """ + import pyarrow + from pandas.core.arrays._arrow_utils import ArrowIntervalType + + try: + subtype = pyarrow.from_numpy_dtype(self.dtype.subtype) + except TypeError: + raise TypeError( + "Conversion to arrow with subtype '{}' " + "is not supported".format(self.dtype.subtype) + ) + interval_type = ArrowIntervalType(subtype, self.closed) + storage_array = pyarrow.StructArray.from_arrays( + [ + pyarrow.array(self.left, type=subtype, from_pandas=True), + pyarrow.array(self.right, type=subtype, from_pandas=True), + ], + names=["left", "right"], + ) + mask = self.isna() + if mask.any(): + # if there are missing values, set validity bitmap also on the array level + null_bitmap = pyarrow.array(~mask).buffers()[1] + storage_array = pyarrow.StructArray.from_buffers( + storage_array.type, + len(storage_array), + [null_bitmap], + children=[storage_array.field(0), storage_array.field(1)], + ) + + if type is not None: + if type.equals(interval_type.storage_type): + return storage_array + elif isinstance(type, ArrowIntervalType): + # ensure we have the same subtype and closed attributes + if not type.equals(interval_type): + raise TypeError( + "Not supported to convert IntervalArray to type with " + "different 'subtype' ({0} vs {1}) and 'closed' ({2} vs {3}) " + "attributes".format( + self.dtype.subtype, type.subtype, self.closed, type.closed + ) + ) + else: + raise TypeError( + "Not supported to convert IntervalArray to '{0}' type".format(type) + ) + + return pyarrow.ExtensionArray.from_storage(interval_type, storage_array) + _interval_shared_docs[ "to_tuples" ] = """ diff --git a/pandas/core/arrays/period.py b/pandas/core/arrays/period.py index d1c574adeb236..b5fa3206055cd 100644 --- a/pandas/core/arrays/period.py +++ b/pandas/core/arrays/period.py @@ -283,6 +283,32 @@ def __array__(self, dtype=None): # overriding DatetimelikeArray return np.array(list(self), dtype=object) + def __arrow_array__(self, type=None): + """ + Convert myself into a pyarrow Array. + """ + import pyarrow + from pandas.core.arrays._arrow_utils import ArrowPeriodType + + if type is not None: + if pyarrow.types.is_integer(type): + return pyarrow.array(self._data, mask=self.isna(), type=type) + elif isinstance(type, ArrowPeriodType): + # ensure we have the same freq + if self.freqstr != type.freq: + raise TypeError( + "Not supported to convert PeriodArray to array with different" + " 'freq' ({0} vs {1})".format(self.freqstr, type.freq) + ) + else: + raise TypeError( + "Not supported to convert PeriodArray to '{0}' type".format(type) + ) + + period_type = ArrowPeriodType(self.freqstr) + storage_array = pyarrow.array(self._data, mask=self.isna(), type="int64") + return pyarrow.ExtensionArray.from_storage(period_type, storage_array) + # -------------------------------------------------------------------- # Vectorized analogues of Period properties diff --git a/pandas/core/dtypes/dtypes.py b/pandas/core/dtypes/dtypes.py index eed4514baa817..1df7d9028171d 100644 --- a/pandas/core/dtypes/dtypes.py +++ b/pandas/core/dtypes/dtypes.py @@ -950,6 +950,26 @@ def construct_array_type(cls): return PeriodArray + def __from_arrow__(self, array): + """Construct PeriodArray from pyarrow Array/ChunkedArray.""" + import pyarrow + from pandas.core.arrays import PeriodArray + from pandas.core.arrays._arrow_utils import pyarrow_array_to_numpy_and_mask + + if isinstance(array, pyarrow.Array): + chunks = [array] + else: + chunks = array.chunks + + results = [] + for arr in chunks: + data, mask = pyarrow_array_to_numpy_and_mask(arr, dtype="int64") + parr = PeriodArray(data.copy(), freq=self.freq, copy=False) + parr[~mask] = NaT + results.append(parr) + + return PeriodArray._concat_same_type(results) + @register_extension_dtype class IntervalDtype(PandasExtensionDtype): @@ -1121,3 +1141,22 @@ def is_dtype(cls, dtype) -> bool: else: return False return super().is_dtype(dtype) + + def __from_arrow__(self, array): + """Construct IntervalArray from pyarrow Array/ChunkedArray.""" + import pyarrow + from pandas.core.arrays import IntervalArray + + if isinstance(array, pyarrow.Array): + chunks = [array] + else: + chunks = array.chunks + + results = [] + for arr in chunks: + left = np.asarray(arr.storage.field("left"), dtype=self.subtype) + right = np.asarray(arr.storage.field("right"), dtype=self.subtype) + iarr = IntervalArray.from_arrays(left, right, closed=array.type.closed) + results.append(iarr) + + return IntervalArray._concat_same_type(results) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index f68347f042086..3a686a1a3b122 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -76,6 +76,9 @@ def __init__(self): ) import pyarrow.parquet + # import utils to register the pyarrow extension types + import pandas.core.arrays._arrow_utils # noqa + self.api = pyarrow def write( diff --git a/pandas/tests/arrays/interval/test_interval.py b/pandas/tests/arrays/interval/test_interval.py index 82db14d9eb135..e046d87780bb4 100644 --- a/pandas/tests/arrays/interval/test_interval.py +++ b/pandas/tests/arrays/interval/test_interval.py @@ -1,6 +1,8 @@ import numpy as np import pytest +import pandas.util._test_decorators as td + import pandas as pd from pandas import ( Index, @@ -103,3 +105,110 @@ def test_repr(): "Length: 2, closed: right, dtype: interval[int64]" ) assert result == expected + + +# ---------------------------------------------------------------------------- +# Arrow interaction + + +pyarrow_skip = td.skip_if_no("pyarrow", min_version="0.15.1.dev") + + +@pyarrow_skip +def test_arrow_extension_type(): + import pyarrow as pa + from pandas.core.arrays._arrow_utils import ArrowIntervalType + + p1 = ArrowIntervalType(pa.int64(), "left") + p2 = ArrowIntervalType(pa.int64(), "left") + p3 = ArrowIntervalType(pa.int64(), "right") + + assert p1.closed == "left" + assert p1 == p2 + assert not p1 == p3 + assert hash(p1) == hash(p2) + assert not hash(p1) == hash(p3) + + +@pyarrow_skip +def test_arrow_array(): + import pyarrow as pa + from pandas.core.arrays._arrow_utils import ArrowIntervalType + + intervals = pd.interval_range(1, 5, freq=1).array + + result = pa.array(intervals) + assert isinstance(result.type, ArrowIntervalType) + assert result.type.closed == intervals.closed + assert result.type.subtype == pa.int64() + assert result.storage.field("left").equals(pa.array([1, 2, 3, 4], type="int64")) + assert result.storage.field("right").equals(pa.array([2, 3, 4, 5], type="int64")) + + expected = pa.array([{"left": i, "right": i + 1} for i in range(1, 5)]) + assert result.storage.equals(expected) + + # convert to its storage type + result = pa.array(intervals, type=expected.type) + assert result.equals(expected) + + # unsupported conversions + with pytest.raises(TypeError): + pa.array(intervals, type="float64") + + with pytest.raises(TypeError, match="different 'subtype'"): + pa.array(intervals, type=ArrowIntervalType(pa.float64(), "left")) + + +@pyarrow_skip +def test_arrow_array_missing(): + import pyarrow as pa + from pandas.core.arrays._arrow_utils import ArrowIntervalType + + arr = IntervalArray.from_breaks([0, 1, 2, 3]) + arr[1] = None + + result = pa.array(arr) + assert isinstance(result.type, ArrowIntervalType) + assert result.type.closed == arr.closed + assert result.type.subtype == pa.float64() + + # fields have missing values (not NaN) + left = pa.array([0.0, None, 2.0], type="float64") + right = pa.array([1.0, None, 3.0], type="float64") + assert result.storage.field("left").equals(left) + assert result.storage.field("right").equals(right) + + # structarray itself also has missing values on the array level + vals = [ + {"left": 0.0, "right": 1.0}, + {"left": None, "right": None}, + {"left": 2.0, "right": 3.0}, + ] + expected = pa.StructArray.from_pandas(vals, mask=np.array([False, True, False])) + assert result.storage.equals(expected) + + +@pyarrow_skip +@pytest.mark.parametrize( + "breaks", + [[0, 1, 2, 3], pd.date_range("2017", periods=4, freq="D")], + ids=["int", "datetime64[ns]"], +) +def test_arrow_table_roundtrip(breaks): + import pyarrow as pa + from pandas.core.arrays._arrow_utils import ArrowIntervalType + + arr = IntervalArray.from_breaks(breaks) + arr[1] = None + df = pd.DataFrame({"a": arr}) + + table = pa.table(df) + assert isinstance(table.field("a").type, ArrowIntervalType) + result = table.to_pandas() + assert isinstance(result["a"].dtype, pd.IntervalDtype) + tm.assert_frame_equal(result, df) + + table2 = pa.concat_tables([table, table]) + result = table2.to_pandas() + expected = pd.concat([df, df], ignore_index=True) + tm.assert_frame_equal(result, expected) diff --git a/pandas/tests/arrays/test_period.py b/pandas/tests/arrays/test_period.py index 5068e8d57e1de..1f4351c7e20ee 100644 --- a/pandas/tests/arrays/test_period.py +++ b/pandas/tests/arrays/test_period.py @@ -3,6 +3,7 @@ from pandas._libs.tslibs import iNaT from pandas._libs.tslibs.period import IncompatibleFrequency +import pandas.util._test_decorators as td from pandas.core.dtypes.dtypes import PeriodDtype, registry @@ -323,3 +324,91 @@ def test_min_max_empty(self, skipna): result = arr.max(skipna=skipna) assert result is pd.NaT + + +# ---------------------------------------------------------------------------- +# Arrow interaction + +pyarrow_skip = pyarrow_skip = td.skip_if_no("pyarrow", min_version="0.15.1.dev") + + +@pyarrow_skip +def test_arrow_extension_type(): + from pandas.core.arrays._arrow_utils import ArrowPeriodType + + p1 = ArrowPeriodType("D") + p2 = ArrowPeriodType("D") + p3 = ArrowPeriodType("M") + + assert p1.freq == "D" + assert p1 == p2 + assert not p1 == p3 + assert hash(p1) == hash(p2) + assert not hash(p1) == hash(p3) + + +@pyarrow_skip +@pytest.mark.parametrize( + "data, freq", + [ + (pd.date_range("2017", periods=3), "D"), + (pd.date_range("2017", periods=3, freq="A"), "A-DEC"), + ], +) +def test_arrow_array(data, freq): + import pyarrow as pa + from pandas.core.arrays._arrow_utils import ArrowPeriodType + + periods = period_array(data, freq=freq) + result = pa.array(periods) + assert isinstance(result.type, ArrowPeriodType) + assert result.type.freq == freq + expected = pa.array(periods.asi8, type="int64") + assert result.storage.equals(expected) + + # convert to its storage type + result = pa.array(periods, type=pa.int64()) + assert result.equals(expected) + + # unsupported conversions + with pytest.raises(TypeError): + pa.array(periods, type="float64") + + with pytest.raises(TypeError, match="different 'freq'"): + pa.array(periods, type=ArrowPeriodType("T")) + + +@pyarrow_skip +def test_arrow_array_missing(): + import pyarrow as pa + from pandas.core.arrays._arrow_utils import ArrowPeriodType + + arr = PeriodArray([1, 2, 3], freq="D") + arr[1] = pd.NaT + + result = pa.array(arr) + assert isinstance(result.type, ArrowPeriodType) + assert result.type.freq == "D" + expected = pa.array([1, None, 3], type="int64") + assert result.storage.equals(expected) + + +@pyarrow_skip +def test_arrow_table_roundtrip(): + import pyarrow as pa + from pandas.core.arrays._arrow_utils import ArrowPeriodType + + arr = PeriodArray([1, 2, 3], freq="D") + arr[1] = pd.NaT + df = pd.DataFrame({"a": arr}) + + table = pa.table(df) + assert isinstance(table.field("a").type, ArrowPeriodType) + result = table.to_pandas() + assert isinstance(result["a"].dtype, PeriodDtype) + tm.assert_frame_equal(result, df) + + table2 = pa.concat_tables([table, table]) + result = table2.to_pandas() + expected = pd.concat([df, df], ignore_index=True) + tm.assert_frame_equal(result, expected) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index f9bd0df4a9196..d51c712ed5abd 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -443,11 +443,12 @@ def test_duplicate_columns(self, pa): self.check_error_on_write(df, pa, ValueError) def test_unsupported(self, pa): - # period - df = pd.DataFrame({"a": pd.period_range("2013", freq="M", periods=3)}) - # pyarrow 0.11 raises ArrowTypeError - # older pyarrows raise ArrowInvalid - self.check_error_on_write(df, pa, Exception) + if LooseVersion(pyarrow.__version__) < LooseVersion("0.15.1.dev"): + # period - will be supported using an extension type with pyarrow 1.0 + df = pd.DataFrame({"a": pd.period_range("2013", freq="M", periods=3)}) + # pyarrow 0.11 raises ArrowTypeError + # older pyarrows raise ArrowInvalid + self.check_error_on_write(df, pa, Exception) # timedelta df = pd.DataFrame({"a": pd.timedelta_range("1 day", periods=3)}) @@ -550,6 +551,19 @@ def test_additional_extension_arrays(self, pa): expected = df.assign(a=df.a.astype("float64")) check_round_trip(df, pa, expected=expected) + @td.skip_if_no("pyarrow", min_version="0.15.1.dev") + def test_additional_extension_types(self, pa): + # test additional ExtensionArrays that are supported through the + # __arrow_array__ protocol + by defining a custom ExtensionType + df = pd.DataFrame( + { + # Arrow does not yet support struct in writing to Parquet (ARROW-1644) + # "c": pd.arrays.IntervalArray.from_tuples([(0, 1), (1, 2), (3, 4)]), + "d": pd.period_range("2012-01-01", periods=3, freq="D"), + } + ) + check_round_trip(df, pa) + class TestParquetFastParquet(Base): @td.skip_if_no("fastparquet", min_version="0.3.2")